datafusion_expr/partition_evaluator.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Partition evaluation module
19
20use arrow::array::ArrayRef;
21use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
22use std::fmt::Debug;
23use std::ops::Range;
24
25use crate::window_state::WindowAggState;
26
27/// Partition evaluator for Window Functions
28///
29/// # Background
30///
31/// An implementation of this trait is created and used for each
32/// partition defined by an `OVER` clause and is instantiated by
33/// the DataFusion runtime.
34///
35/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
36/// on the following data:
37///
38/// ```text
39/// col | val
40/// --- + ----
41/// A | 10
42/// A | 10
43/// C | 20
44/// D | 30
45/// D | 30
46/// ```
47///
48/// Will instantiate three `PartitionEvaluator`s, one each for the
49/// partitions defined by `col=A`, `col=B`, and `col=C`.
50///
51/// ```text
52/// col | val
53/// --- + ----
54/// A | 10 <--- partition 1
55/// A | 10
56///
57/// col | val
58/// --- + ----
59/// C | 20 <--- partition 2
60///
61/// col | val
62/// --- + ----
63/// D | 30 <--- partition 3
64/// D | 30
65/// ```
66///
67/// Different methods on this trait will be called depending on the
68/// capabilities described by [`supports_bounded_execution`],
69/// [`uses_window_frame`], and [`include_rank`],
70///
71/// When implementing a new `PartitionEvaluator`, implement
72/// corresponding evaluator according to table below.
73///
74/// # Implementation Table
75///
76/// |[`uses_window_frame`]|[`supports_bounded_execution`]|[`include_rank`]|function_to_implement|
77/// |---|---|----|----|
78/// |false (default) |false (default) |false (default) | [`evaluate_all`] |
79/// |false |true |false | [`evaluate`] |
80/// |false |true/false |true | [`evaluate_all_with_rank`] |
81/// |true |true/false |true/false | [`evaluate`] |
82///
83/// [`evaluate`]: Self::evaluate
84/// [`evaluate_all`]: Self::evaluate_all
85/// [`evaluate_all_with_rank`]: Self::evaluate_all_with_rank
86/// [`uses_window_frame`]: Self::uses_window_frame
87/// [`include_rank`]: Self::include_rank
88/// [`supports_bounded_execution`]: Self::supports_bounded_execution
89pub trait PartitionEvaluator: Debug + Send {
90 /// When the window frame has a fixed beginning (e.g UNBOUNDED
91 /// PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and
92 /// NTH_VALUE do not need the (unbounded) input once they have
93 /// seen a certain amount of input.
94 ///
95 /// `memoize` is called after each input batch is processed, and
96 /// such functions can save whatever they need and modify
97 /// [`WindowAggState`] appropriately to allow rows to be pruned
98 fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> {
99 Ok(())
100 }
101
102 /// If `uses_window_frame` flag is `false`. This method is used to
103 /// calculate required range for the window function during
104 /// stateful execution.
105 ///
106 /// Generally there is no required range, hence by default this
107 /// returns smallest range(current row). e.g seeing current row is
108 /// enough to calculate window result (such as row_number, rank,
109 /// etc)
110 fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
111 if self.uses_window_frame() {
112 exec_err!("Range should be calculated from window frame")
113 } else {
114 Ok(Range {
115 start: idx,
116 end: idx + 1,
117 })
118 }
119 }
120
121 /// Get whether evaluator needs future data for its result (if so returns `false`) or not
122 fn is_causal(&self) -> bool {
123 false
124 }
125
126 /// Evaluate a window function on an entire input partition.
127 ///
128 /// This function is called once per input *partition* for window
129 /// functions that *do not use* values from the window frame,
130 /// such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`,
131 /// `CUME_DIST`, `LEAD`, `LAG`).
132 ///
133 /// It produces the result of all rows in a single pass. It
134 /// expects to receive the entire partition as the `value` and
135 /// must produce an output column with one output row for every
136 /// input row.
137 ///
138 /// `num_rows` is required to correctly compute the output in case
139 /// `values.len() == 0`
140 ///
141 /// Implementing this function is an optimization: certain window
142 /// functions are not affected by the window frame definition or
143 /// the query doesn't have a frame, and `evaluate` skips the
144 /// (costly) window frame boundary calculation and the overhead of
145 /// calling `evaluate` for each output row.
146 ///
147 /// For example, the `LAG` built in window function does not use
148 /// the values of its window frame (it can be computed in one shot
149 /// on the entire partition with `Self::evaluate_all` regardless of the
150 /// window defined in the `OVER` clause)
151 ///
152 /// ```sql
153 /// lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
154 /// ```
155 ///
156 /// However, `avg()` computes the average in the window and thus
157 /// does use its window frame
158 ///
159 /// ```sql
160 /// avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
161 /// ```
162 fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
163 // When window frame boundaries are not used and evaluator supports bounded execution
164 // We can calculate evaluate result by repeatedly calling `self.evaluate` `num_rows` times
165 // If user wants to implement more efficient version, this method should be overwritten
166 // Default implementation may behave suboptimally (For instance `NumRowEvaluator` overwrites it)
167 if !self.uses_window_frame() && self.supports_bounded_execution() {
168 let res = (0..num_rows)
169 .map(|idx| self.evaluate(values, &self.get_range(idx, num_rows)?))
170 .collect::<Result<Vec<_>>>()?;
171 ScalarValue::iter_to_array(res)
172 } else {
173 not_impl_err!("evaluate_all is not implemented by default")
174 }
175 }
176
177 /// Evaluate window function on a range of rows in an input
178 /// partition.x
179 ///
180 /// This is the simplest and most general function to implement
181 /// but also the least performant as it creates output one row at
182 /// a time. It is typically much faster to implement stateful
183 /// evaluation using one of the other specialized methods on this
184 /// trait.
185 ///
186 /// Returns a [`ScalarValue`] that is the value of the window
187 /// function within `range` for the entire partition. Argument
188 /// `values` contains the evaluation result of function arguments
189 /// and evaluation results of ORDER BY expressions. If function has a
190 /// single argument, `values[1..]` will contain ORDER BY expression results.
191 fn evaluate(
192 &mut self,
193 _values: &[ArrayRef],
194 _range: &Range<usize>,
195 ) -> Result<ScalarValue> {
196 not_impl_err!("evaluate is not implemented by default")
197 }
198
199 /// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
200 /// functions that only need the rank of a row within its window
201 /// frame.
202 ///
203 /// Evaluate the partition evaluator against the partition using
204 /// the row ranks. For example, `RANK(col)` produces
205 ///
206 /// ```text
207 /// col | rank
208 /// --- + ----
209 /// A | 1
210 /// A | 1
211 /// C | 3
212 /// D | 4
213 /// D | 5
214 /// ```
215 ///
216 /// For this case, `num_rows` would be `5` and the
217 /// `ranks_in_partition` would be called with
218 ///
219 /// ```text
220 /// [
221 /// (0,1),
222 /// (2,2),
223 /// (3,4),
224 /// ]
225 /// ```
226 fn evaluate_all_with_rank(
227 &self,
228 _num_rows: usize,
229 _ranks_in_partition: &[Range<usize>],
230 ) -> Result<ArrayRef> {
231 not_impl_err!("evaluate_partition_with_rank is not implemented by default")
232 }
233
234 /// Can the window function be incrementally computed using
235 /// bounded memory?
236 ///
237 /// See the table on [`Self`] for what functions to implement
238 fn supports_bounded_execution(&self) -> bool {
239 false
240 }
241
242 /// Does the window function use the values from the window frame,
243 /// if one is specified?
244 ///
245 /// See the table on [`Self`] for what functions to implement
246 fn uses_window_frame(&self) -> bool {
247 false
248 }
249
250 /// Can this function be evaluated with (only) rank
251 ///
252 /// See the table on [`Self`] for what functions to implement
253 fn include_rank(&self) -> bool {
254 false
255 }
256}