datafusion_expr/
partition_evaluator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Partition evaluation module
19
20use arrow::array::ArrayRef;
21use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
22use std::fmt::Debug;
23use std::ops::Range;
24
25use crate::window_state::WindowAggState;
26
27/// Partition evaluator for Window Functions
28///
29/// # Background
30///
31/// An implementation of this trait is created and used for each
32/// partition defined by an `OVER` clause and is instantiated by
33/// the DataFusion runtime.
34///
35/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
36/// on the following data:
37///
38/// ```text
39/// col | val
40/// --- + ----
41///  A  | 10
42///  A  | 10
43///  C  | 20
44///  D  | 30
45///  D  | 30
46/// ```
47///
48/// Will instantiate three `PartitionEvaluator`s, one each for the
49/// partitions defined by `col=A`, `col=B`, and `col=C`.
50///
51/// ```text
52/// col | val
53/// --- + ----
54///  A  | 10     <--- partition 1
55///  A  | 10
56///
57/// col | val
58/// --- + ----
59///  C  | 20     <--- partition 2
60///
61/// col | val
62/// --- + ----
63///  D  | 30     <--- partition 3
64///  D  | 30
65/// ```
66///
67/// Different methods on this trait will be called depending on the
68/// capabilities described by [`supports_bounded_execution`],
69/// [`uses_window_frame`], and [`include_rank`],
70///
71/// When implementing a new `PartitionEvaluator`, implement
72/// corresponding evaluator according to table below.
73///
74/// # Implementation Table
75///
76/// |[`uses_window_frame`]|[`supports_bounded_execution`]|[`include_rank`]|function_to_implement|
77/// |---|---|----|----|
78/// |false (default)      |false (default)               |false (default)   | [`evaluate_all`]           |
79/// |false                |true                          |false             | [`evaluate`]               |
80/// |false                |true/false                    |true              | [`evaluate_all_with_rank`] |
81/// |true                 |true/false                    |true/false        | [`evaluate`]               |
82///
83/// [`evaluate`]: Self::evaluate
84/// [`evaluate_all`]: Self::evaluate_all
85/// [`evaluate_all_with_rank`]: Self::evaluate_all_with_rank
86/// [`uses_window_frame`]: Self::uses_window_frame
87/// [`include_rank`]: Self::include_rank
88/// [`supports_bounded_execution`]: Self::supports_bounded_execution
89pub trait PartitionEvaluator: Debug + Send {
90    /// When the window frame has a fixed beginning (e.g UNBOUNDED
91    /// PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and
92    /// NTH_VALUE do not need the (unbounded) input once they have
93    /// seen a certain amount of input.
94    ///
95    /// `memoize` is called after each input batch is processed, and
96    /// such functions can save whatever they need and modify
97    /// [`WindowAggState`] appropriately to allow rows to be pruned
98    fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> {
99        Ok(())
100    }
101
102    /// If `uses_window_frame` flag is `false`. This method is used to
103    /// calculate required range for the window function during
104    /// stateful execution.
105    ///
106    /// Generally there is no required range, hence by default this
107    /// returns smallest range(current row). e.g seeing current row is
108    /// enough to calculate window result (such as row_number, rank,
109    /// etc)
110    fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
111        if self.uses_window_frame() {
112            exec_err!("Range should be calculated from window frame")
113        } else {
114            Ok(Range {
115                start: idx,
116                end: idx + 1,
117            })
118        }
119    }
120
121    /// Get whether evaluator needs future data for its result (if so returns `false`) or not
122    fn is_causal(&self) -> bool {
123        false
124    }
125
126    /// Evaluate a window function on an entire input partition.
127    ///
128    /// This function is called once per input *partition* for window
129    /// functions that *do not use* values from the window frame,
130    /// such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`,
131    /// `CUME_DIST`, `LEAD`, `LAG`).
132    ///
133    /// It produces the result of all rows in a single pass. It
134    /// expects to receive the entire partition as the `value` and
135    /// must produce an output column with one output row for every
136    /// input row.
137    ///
138    /// `num_rows` is required to correctly compute the output in case
139    /// `values.len() == 0`
140    ///
141    /// Implementing this function is an optimization: certain window
142    /// functions are not affected by the window frame definition or
143    /// the query doesn't have a frame, and `evaluate` skips the
144    /// (costly) window frame boundary calculation and the overhead of
145    /// calling `evaluate` for each output row.
146    ///
147    /// For example, the `LAG` built in window function does not use
148    /// the values of its window frame (it can be computed in one shot
149    /// on the entire partition with `Self::evaluate_all` regardless of the
150    /// window defined in the `OVER` clause)
151    ///
152    /// ```sql
153    /// lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
154    /// ```
155    ///
156    /// However, `avg()` computes the average in the window and thus
157    /// does use its window frame
158    ///
159    /// ```sql
160    /// avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
161    /// ```
162    fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
163        // When window frame boundaries are not used and evaluator supports bounded execution
164        // We can calculate evaluate result by repeatedly calling `self.evaluate` `num_rows` times
165        // If user wants to implement more efficient version, this method should be overwritten
166        // Default implementation may behave suboptimally (For instance `NumRowEvaluator` overwrites it)
167        if !self.uses_window_frame() && self.supports_bounded_execution() {
168            let res = (0..num_rows)
169                .map(|idx| self.evaluate(values, &self.get_range(idx, num_rows)?))
170                .collect::<Result<Vec<_>>>()?;
171            ScalarValue::iter_to_array(res)
172        } else {
173            not_impl_err!("evaluate_all is not implemented by default")
174        }
175    }
176
177    /// Evaluate window function on a range of rows in an input
178    /// partition.x
179    ///
180    /// This is the simplest and most general function to implement
181    /// but also the least performant as it creates output one row at
182    /// a time. It is typically much faster to implement stateful
183    /// evaluation using one of the other specialized methods on this
184    /// trait.
185    ///
186    /// Returns a [`ScalarValue`] that is the value of the window
187    /// function within `range` for the entire partition. Argument
188    /// `values` contains the evaluation result of function arguments
189    /// and evaluation results of ORDER BY expressions. If function has a
190    /// single argument, `values[1..]` will contain ORDER BY expression results.
191    fn evaluate(
192        &mut self,
193        _values: &[ArrayRef],
194        _range: &Range<usize>,
195    ) -> Result<ScalarValue> {
196        not_impl_err!("evaluate is not implemented by default")
197    }
198
199    /// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
200    /// functions that only need the rank of a row within its window
201    /// frame.
202    ///
203    /// Evaluate the partition evaluator against the partition using
204    /// the row ranks. For example, `RANK(col)` produces
205    ///
206    /// ```text
207    /// col | rank
208    /// --- + ----
209    ///  A  | 1
210    ///  A  | 1
211    ///  C  | 3
212    ///  D  | 4
213    ///  D  | 5
214    /// ```
215    ///
216    /// For this case, `num_rows` would be `5` and the
217    /// `ranks_in_partition` would be called with
218    ///
219    /// ```text
220    /// [
221    ///   (0,1),
222    ///   (2,2),
223    ///   (3,4),
224    /// ]
225    /// ```
226    fn evaluate_all_with_rank(
227        &self,
228        _num_rows: usize,
229        _ranks_in_partition: &[Range<usize>],
230    ) -> Result<ArrayRef> {
231        not_impl_err!("evaluate_partition_with_rank is not implemented by default")
232    }
233
234    /// Can the window function be incrementally computed using
235    /// bounded memory?
236    ///
237    /// See the table on [`Self`] for what functions to implement
238    fn supports_bounded_execution(&self) -> bool {
239        false
240    }
241
242    /// Does the window function use the values from the window frame,
243    /// if one is specified?
244    ///
245    /// See the table on [`Self`] for what functions to implement
246    fn uses_window_frame(&self) -> bool {
247        false
248    }
249
250    /// Can this function be evaluated with (only) rank
251    ///
252    /// See the table on [`Self`] for what functions to implement
253    fn include_rank(&self) -> bool {
254        false
255    }
256}