datafusion_physical_plan/windows/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical expressions for window functions
19
20mod bounded_window_agg_exec;
21mod utils;
22mod window_agg_exec;
23
24use std::borrow::Borrow;
25use std::iter;
26use std::sync::Arc;
27
28use crate::{
29    expressions::PhysicalSortExpr, ExecutionPlan, ExecutionPlanProperties,
30    InputOrderMode, PhysicalExpr,
31};
32
33use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
34use arrow_schema::SortOptions;
35use datafusion_common::{exec_err, Result};
36use datafusion_expr::{
37    PartitionEvaluator, ReversedUDWF, SetMonotonicity, WindowFrame,
38    WindowFunctionDefinition, WindowUDF,
39};
40use datafusion_functions_window_common::expr::ExpressionArgs;
41use datafusion_functions_window_common::field::WindowUDFFieldArgs;
42use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
43use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
44use datafusion_physical_expr::expressions::Column;
45use datafusion_physical_expr::{
46    reverse_order_bys,
47    window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr},
48    ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement,
49};
50use datafusion_physical_expr_common::sort_expr::LexRequirement;
51
52use itertools::Itertools;
53
54// Public interface:
55pub use bounded_window_agg_exec::BoundedWindowAggExec;
56pub use datafusion_physical_expr::window::{
57    PlainAggregateWindowExpr, StandardWindowExpr, WindowExpr,
58};
59pub use window_agg_exec::WindowAggExec;
60
61/// Build field from window function and add it into schema
62pub fn schema_add_window_field(
63    args: &[Arc<dyn PhysicalExpr>],
64    schema: &Schema,
65    window_fn: &WindowFunctionDefinition,
66    fn_name: &str,
67) -> Result<Arc<Schema>> {
68    let data_types = args
69        .iter()
70        .map(|e| Arc::clone(e).as_ref().data_type(schema))
71        .collect::<Result<Vec<_>>>()?;
72    let nullability = args
73        .iter()
74        .map(|e| Arc::clone(e).as_ref().nullable(schema))
75        .collect::<Result<Vec<_>>>()?;
76    let window_expr_return_type =
77        window_fn.return_type(&data_types, &nullability, fn_name)?;
78    let mut window_fields = schema
79        .fields()
80        .iter()
81        .map(|f| f.as_ref().clone())
82        .collect_vec();
83    // Skip extending schema for UDAF
84    if let WindowFunctionDefinition::AggregateUDF(_) = window_fn {
85        Ok(Arc::new(Schema::new(window_fields)))
86    } else {
87        window_fields.extend_from_slice(&[Field::new(
88            fn_name,
89            window_expr_return_type,
90            false,
91        )]);
92        Ok(Arc::new(Schema::new(window_fields)))
93    }
94}
95
96/// Create a physical expression for window function
97#[allow(clippy::too_many_arguments)]
98pub fn create_window_expr(
99    fun: &WindowFunctionDefinition,
100    name: String,
101    args: &[Arc<dyn PhysicalExpr>],
102    partition_by: &[Arc<dyn PhysicalExpr>],
103    order_by: &LexOrdering,
104    window_frame: Arc<WindowFrame>,
105    input_schema: &Schema,
106    ignore_nulls: bool,
107) -> Result<Arc<dyn WindowExpr>> {
108    Ok(match fun {
109        WindowFunctionDefinition::AggregateUDF(fun) => {
110            let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
111                .schema(Arc::new(input_schema.clone()))
112                .alias(name)
113                .with_ignore_nulls(ignore_nulls)
114                .build()
115                .map(Arc::new)?;
116            window_expr_from_aggregate_expr(
117                partition_by,
118                order_by,
119                window_frame,
120                aggregate,
121            )
122        }
123        WindowFunctionDefinition::WindowUDF(fun) => Arc::new(StandardWindowExpr::new(
124            create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?,
125            partition_by,
126            order_by,
127            window_frame,
128        )),
129    })
130}
131
132/// Creates an appropriate [`WindowExpr`] based on the window frame and
133fn window_expr_from_aggregate_expr(
134    partition_by: &[Arc<dyn PhysicalExpr>],
135    order_by: &LexOrdering,
136    window_frame: Arc<WindowFrame>,
137    aggregate: Arc<AggregateFunctionExpr>,
138) -> Arc<dyn WindowExpr> {
139    // Is there a potentially unlimited sized window frame?
140    let unbounded_window = window_frame.is_ever_expanding();
141
142    if !unbounded_window {
143        Arc::new(SlidingAggregateWindowExpr::new(
144            aggregate,
145            partition_by,
146            order_by,
147            window_frame,
148        ))
149    } else {
150        Arc::new(PlainAggregateWindowExpr::new(
151            aggregate,
152            partition_by,
153            order_by,
154            window_frame,
155        ))
156    }
157}
158
159/// Creates a `StandardWindowFunctionExpr` suitable for a user defined window function
160pub fn create_udwf_window_expr(
161    fun: &Arc<WindowUDF>,
162    args: &[Arc<dyn PhysicalExpr>],
163    input_schema: &Schema,
164    name: String,
165    ignore_nulls: bool,
166) -> Result<Arc<dyn StandardWindowFunctionExpr>> {
167    // need to get the types into an owned vec for some reason
168    let input_types: Vec<_> = args
169        .iter()
170        .map(|arg| arg.data_type(input_schema))
171        .collect::<Result<_>>()?;
172
173    let udwf_expr = Arc::new(WindowUDFExpr {
174        fun: Arc::clone(fun),
175        args: args.to_vec(),
176        input_types,
177        name,
178        is_reversed: false,
179        ignore_nulls,
180    });
181
182    // Early validation of input expressions
183    // We create a partition evaluator because in the user-defined window
184    // implementation this is where code for parsing input expressions
185    // exist. The benefits are:
186    // - If any of the input expressions are invalid we catch them early
187    // in the planning phase, rather than during execution.
188    // - Maintains compatibility with built-in (now removed) window
189    // functions validation behavior.
190    // - Predictable and reliable error handling.
191    // See discussion here:
192    // https://github.com/apache/datafusion/pull/13201#issuecomment-2454209975
193    let _ = udwf_expr.create_evaluator()?;
194
195    Ok(udwf_expr)
196}
197
198/// Implements [`StandardWindowFunctionExpr`] for [`WindowUDF`]
199#[derive(Clone, Debug)]
200pub struct WindowUDFExpr {
201    fun: Arc<WindowUDF>,
202    args: Vec<Arc<dyn PhysicalExpr>>,
203    /// Display name
204    name: String,
205    /// Types of input expressions
206    input_types: Vec<DataType>,
207    /// This is set to `true` only if the user-defined window function
208    /// expression supports evaluation in reverse order, and the
209    /// evaluation order is reversed.
210    is_reversed: bool,
211    /// Set to `true` if `IGNORE NULLS` is defined, `false` otherwise.
212    ignore_nulls: bool,
213}
214
215impl WindowUDFExpr {
216    pub fn fun(&self) -> &Arc<WindowUDF> {
217        &self.fun
218    }
219}
220
221impl StandardWindowFunctionExpr for WindowUDFExpr {
222    fn as_any(&self) -> &dyn std::any::Any {
223        self
224    }
225
226    fn field(&self) -> Result<Field> {
227        self.fun
228            .field(WindowUDFFieldArgs::new(&self.input_types, &self.name))
229    }
230
231    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
232        self.fun
233            .expressions(ExpressionArgs::new(&self.args, &self.input_types))
234    }
235
236    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
237        self.fun
238            .partition_evaluator_factory(PartitionEvaluatorArgs::new(
239                &self.args,
240                &self.input_types,
241                self.is_reversed,
242                self.ignore_nulls,
243            ))
244    }
245
246    fn name(&self) -> &str {
247        &self.name
248    }
249
250    fn reverse_expr(&self) -> Option<Arc<dyn StandardWindowFunctionExpr>> {
251        match self.fun.reverse_expr() {
252            ReversedUDWF::Identical => Some(Arc::new(self.clone())),
253            ReversedUDWF::NotSupported => None,
254            ReversedUDWF::Reversed(fun) => Some(Arc::new(WindowUDFExpr {
255                fun,
256                args: self.args.clone(),
257                name: self.name.clone(),
258                input_types: self.input_types.clone(),
259                is_reversed: !self.is_reversed,
260                ignore_nulls: self.ignore_nulls,
261            })),
262        }
263    }
264
265    fn get_result_ordering(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
266        self.fun
267            .sort_options()
268            .zip(schema.column_with_name(self.name()))
269            .map(|(options, (idx, field))| {
270                let expr = Arc::new(Column::new(field.name(), idx));
271                PhysicalSortExpr { expr, options }
272            })
273    }
274}
275
276pub(crate) fn calc_requirements<
277    T: Borrow<Arc<dyn PhysicalExpr>>,
278    S: Borrow<PhysicalSortExpr>,
279>(
280    partition_by_exprs: impl IntoIterator<Item = T>,
281    orderby_sort_exprs: impl IntoIterator<Item = S>,
282) -> Option<LexRequirement> {
283    let mut sort_reqs = LexRequirement::new(
284        partition_by_exprs
285            .into_iter()
286            .map(|partition_by| {
287                PhysicalSortRequirement::new(Arc::clone(partition_by.borrow()), None)
288            })
289            .collect::<Vec<_>>(),
290    );
291    for element in orderby_sort_exprs.into_iter() {
292        let PhysicalSortExpr { expr, options } = element.borrow();
293        if !sort_reqs.iter().any(|e| e.expr.eq(expr)) {
294            sort_reqs.push(PhysicalSortRequirement::new(
295                Arc::clone(expr),
296                Some(*options),
297            ));
298        }
299    }
300    // Convert empty result to None. Otherwise wrap result inside Some()
301    (!sort_reqs.is_empty()).then_some(sort_reqs)
302}
303
304/// This function calculates the indices such that when partition by expressions reordered with the indices
305/// resulting expressions define a preset for existing ordering.
306/// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used,
307/// this vector will be [1, 0]. It means that when we iterate b, a columns with the order [1, 0]
308/// resulting vector (a, b) is a preset of the existing ordering (a, b, c).
309pub fn get_ordered_partition_by_indices(
310    partition_by_exprs: &[Arc<dyn PhysicalExpr>],
311    input: &Arc<dyn ExecutionPlan>,
312) -> Vec<usize> {
313    let (_, indices) = input
314        .equivalence_properties()
315        .find_longest_permutation(partition_by_exprs);
316    indices
317}
318
319pub(crate) fn get_partition_by_sort_exprs(
320    input: &Arc<dyn ExecutionPlan>,
321    partition_by_exprs: &[Arc<dyn PhysicalExpr>],
322    ordered_partition_by_indices: &[usize],
323) -> Result<LexOrdering> {
324    let ordered_partition_exprs = ordered_partition_by_indices
325        .iter()
326        .map(|idx| Arc::clone(&partition_by_exprs[*idx]))
327        .collect::<Vec<_>>();
328    // Make sure ordered section doesn't move over the partition by expression
329    assert!(ordered_partition_by_indices.len() <= partition_by_exprs.len());
330    let (ordering, _) = input
331        .equivalence_properties()
332        .find_longest_permutation(&ordered_partition_exprs);
333    if ordering.len() == ordered_partition_exprs.len() {
334        Ok(ordering)
335    } else {
336        exec_err!("Expects PARTITION BY expression to be ordered")
337    }
338}
339
340pub(crate) fn window_equivalence_properties(
341    schema: &SchemaRef,
342    input: &Arc<dyn ExecutionPlan>,
343    window_exprs: &[Arc<dyn WindowExpr>],
344) -> EquivalenceProperties {
345    // We need to update the schema, so we can't directly use input's equivalence
346    // properties.
347    let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
348        .extend(input.equivalence_properties().clone());
349
350    let window_schema_len = schema.fields.len();
351    let input_schema_len = window_schema_len - window_exprs.len();
352    let window_expr_indices = (input_schema_len..window_schema_len).collect::<Vec<_>>();
353
354    for (i, expr) in window_exprs.iter().enumerate() {
355        let partitioning_exprs = expr.partition_by();
356        let no_partitioning = partitioning_exprs.is_empty();
357        // Collect columns defining partitioning, and construct all `SortOptions`
358        // variations for them. Then, we will check each one whether it satisfies
359        // the existing ordering provided by the input plan.
360        let partition_by_orders = partitioning_exprs
361            .iter()
362            .map(|pb_order| sort_options_resolving_constant(Arc::clone(pb_order)));
363        let all_satisfied_lexs = partition_by_orders
364            .multi_cartesian_product()
365            .map(LexOrdering::new)
366            .filter(|lex| window_eq_properties.ordering_satisfy(lex))
367            .collect::<Vec<_>>();
368        // If there is a partitioning, and no possible ordering cannot satisfy
369        // the input plan's orderings, then we cannot further introduce any
370        // new orderings for the window plan.
371        if !no_partitioning && all_satisfied_lexs.is_empty() {
372            return window_eq_properties;
373        } else if let Some(std_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
374        {
375            std_expr.add_equal_orderings(&mut window_eq_properties);
376        } else if let Some(plain_expr) =
377            expr.as_any().downcast_ref::<PlainAggregateWindowExpr>()
378        {
379            // We are dealing with plain window frames; i.e. frames having an
380            // unbounded starting point.
381            // First, check if the frame covers the whole table:
382            if plain_expr.get_window_frame().end_bound.is_unbounded() {
383                let window_col = Column::new(expr.name(), i + input_schema_len);
384                if no_partitioning {
385                    // Window function has a constant result across the table:
386                    window_eq_properties = window_eq_properties
387                        .with_constants(iter::once(ConstExpr::new(Arc::new(window_col))))
388                } else {
389                    // Window function results in a partial constant value in
390                    // some ordering. Adjust the ordering equivalences accordingly:
391                    let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| {
392                        let orderings = lex.take_exprs();
393                        let new_partial_consts =
394                            sort_options_resolving_constant(Arc::new(window_col.clone()));
395
396                        new_partial_consts.into_iter().map(move |partial| {
397                            let mut existing = orderings.clone();
398                            existing.push(partial);
399                            LexOrdering::new(existing)
400                        })
401                    });
402                    window_eq_properties.add_new_orderings(new_lexs);
403                }
404            } else {
405                // The window frame is ever expanding, so set monotonicity comes
406                // into play.
407                plain_expr.add_equal_orderings(
408                    &mut window_eq_properties,
409                    window_expr_indices[i],
410                );
411            }
412        } else if let Some(sliding_expr) =
413            expr.as_any().downcast_ref::<SlidingAggregateWindowExpr>()
414        {
415            // We are dealing with sliding window frames; i.e. frames having an
416            // advancing starting point. If we have a set-monotonic expression,
417            // we might be able to leverage this property.
418            let set_monotonicity = sliding_expr.get_aggregate_expr().set_monotonicity();
419            if set_monotonicity.ne(&SetMonotonicity::NotMonotonic) {
420                // If the window frame is ever-receding, and we have set
421                // monotonicity, we can utilize it to introduce new orderings.
422                let frame = sliding_expr.get_window_frame();
423                if frame.end_bound.is_unbounded() {
424                    let increasing = set_monotonicity.eq(&SetMonotonicity::Increasing);
425                    let window_col = Column::new(expr.name(), i + input_schema_len);
426                    if no_partitioning {
427                        // Reverse set-monotonic cases with no partitioning:
428                        let new_ordering =
429                            vec![LexOrdering::new(vec![PhysicalSortExpr::new(
430                                Arc::new(window_col),
431                                SortOptions::new(increasing, true),
432                            )])];
433                        window_eq_properties.add_new_orderings(new_ordering);
434                    } else {
435                        // Reverse set-monotonic cases for all orderings:
436                        for lex in all_satisfied_lexs.into_iter() {
437                            let mut existing = lex.take_exprs();
438                            existing.push(PhysicalSortExpr::new(
439                                Arc::new(window_col.clone()),
440                                SortOptions::new(increasing, true),
441                            ));
442                            window_eq_properties
443                                .add_new_ordering(LexOrdering::new(existing));
444                        }
445                    }
446                }
447                // If we ensure that the elements entering the frame is greater
448                // than the ones leaving, and we have increasing set-monotonicity,
449                // then the window function result will be increasing. However,
450                // we also need to check if the frame is causal. If not, we cannot
451                // utilize set-monotonicity since the set shrinks as the frame
452                // boundary starts "touching" the end of the table.
453                else if frame.is_causal() {
454                    let mut args_all_lexs = sliding_expr
455                        .get_aggregate_expr()
456                        .expressions()
457                        .into_iter()
458                        .map(sort_options_resolving_constant)
459                        .multi_cartesian_product();
460
461                    let mut asc = false;
462                    if args_all_lexs.any(|order| {
463                        if let Some(f) = order.first() {
464                            asc = !f.options.descending;
465                        }
466                        window_eq_properties.ordering_satisfy(&LexOrdering::new(order))
467                    }) {
468                        let increasing =
469                            set_monotonicity.eq(&SetMonotonicity::Increasing);
470                        let window_col = Column::new(expr.name(), i + input_schema_len);
471                        if increasing && (asc || no_partitioning) {
472                            let new_ordering =
473                                LexOrdering::new(vec![PhysicalSortExpr::new(
474                                    Arc::new(window_col),
475                                    SortOptions::new(false, false),
476                                )]);
477                            window_eq_properties.add_new_ordering(new_ordering);
478                        } else if !increasing && (!asc || no_partitioning) {
479                            let new_ordering =
480                                LexOrdering::new(vec![PhysicalSortExpr::new(
481                                    Arc::new(window_col),
482                                    SortOptions::new(true, false),
483                                )]);
484                            window_eq_properties.add_new_ordering(new_ordering);
485                        };
486                    }
487                }
488            }
489        }
490    }
491    window_eq_properties
492}
493
494/// Constructs the best-fitting windowing operator (a `WindowAggExec` or a
495/// `BoundedWindowExec`) for the given `input` according to the specifications
496/// of `window_exprs` and `physical_partition_keys`. Here, best-fitting means
497/// not requiring additional sorting and/or partitioning for the given input.
498/// - A return value of `None` represents that there is no way to construct a
499///   windowing operator that doesn't need additional sorting/partitioning for
500///   the given input. Existing ordering should be changed to run the given
501///   windowing operation.
502/// - A `Some(window exec)` value contains the optimal windowing operator (a
503///   `WindowAggExec` or a `BoundedWindowExec`) for the given input.
504pub fn get_best_fitting_window(
505    window_exprs: &[Arc<dyn WindowExpr>],
506    input: &Arc<dyn ExecutionPlan>,
507    // These are the partition keys used during repartitioning.
508    // They are either the same with `window_expr`'s PARTITION BY columns,
509    // or it is empty if partitioning is not desirable for this windowing operator.
510    physical_partition_keys: &[Arc<dyn PhysicalExpr>],
511) -> Result<Option<Arc<dyn ExecutionPlan>>> {
512    // Contains at least one window expr and all of the partition by and order by sections
513    // of the window_exprs are same.
514    let partitionby_exprs = window_exprs[0].partition_by();
515    let orderby_keys = window_exprs[0].order_by();
516    let (should_reverse, input_order_mode) =
517        if let Some((should_reverse, input_order_mode)) =
518            get_window_mode(partitionby_exprs, orderby_keys, input)
519        {
520            (should_reverse, input_order_mode)
521        } else {
522            return Ok(None);
523        };
524    let is_unbounded = input.boundedness().is_unbounded();
525    if !is_unbounded && input_order_mode != InputOrderMode::Sorted {
526        // Executor has bounded input and `input_order_mode` is not `InputOrderMode::Sorted`
527        // in this case removing the sort is not helpful, return:
528        return Ok(None);
529    };
530
531    let window_expr = if should_reverse {
532        if let Some(reversed_window_expr) = window_exprs
533            .iter()
534            .map(|e| e.get_reverse_expr())
535            .collect::<Option<Vec<_>>>()
536        {
537            reversed_window_expr
538        } else {
539            // Cannot take reverse of any of the window expr
540            // In this case, with existing ordering window cannot be run
541            return Ok(None);
542        }
543    } else {
544        window_exprs.to_vec()
545    };
546
547    // If all window expressions can run with bounded memory, choose the
548    // bounded window variant:
549    if window_expr.iter().all(|e| e.uses_bounded_memory()) {
550        Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
551            window_expr,
552            Arc::clone(input),
553            input_order_mode,
554            !physical_partition_keys.is_empty(),
555        )?) as _))
556    } else if input_order_mode != InputOrderMode::Sorted {
557        // For `WindowAggExec` to work correctly PARTITION BY columns should be sorted.
558        // Hence, if `input_order_mode` is not `Sorted` we should convert
559        // input ordering such that it can work with `Sorted` (add `SortExec`).
560        // Effectively `WindowAggExec` works only in `Sorted` mode.
561        Ok(None)
562    } else {
563        Ok(Some(Arc::new(WindowAggExec::try_new(
564            window_expr,
565            Arc::clone(input),
566            !physical_partition_keys.is_empty(),
567        )?) as _))
568    }
569}
570
571/// Compares physical ordering (output ordering of the `input` operator) with
572/// `partitionby_exprs` and `orderby_keys` to decide whether existing ordering
573/// is sufficient to run the current window operator.
574/// - A `None` return value indicates that we can not remove the sort in question
575///   (input ordering is not sufficient to run current window executor).
576/// - A `Some((bool, InputOrderMode))` value indicates that the window operator
577///   can run with existing input ordering, so we can remove `SortExec` before it.
578///
579/// The `bool` field in the return value represents whether we should reverse window
580/// operator to remove `SortExec` before it. The `InputOrderMode` field represents
581/// the mode this window operator should work in to accommodate the existing ordering.
582pub fn get_window_mode(
583    partitionby_exprs: &[Arc<dyn PhysicalExpr>],
584    orderby_keys: &LexOrdering,
585    input: &Arc<dyn ExecutionPlan>,
586) -> Option<(bool, InputOrderMode)> {
587    let input_eqs = input.equivalence_properties().clone();
588    let mut partition_by_reqs: LexRequirement = LexRequirement::new(vec![]);
589    let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs);
590    vec![].extend(indices.iter().map(|&idx| PhysicalSortRequirement {
591        expr: Arc::clone(&partitionby_exprs[idx]),
592        options: None,
593    }));
594    partition_by_reqs
595        .inner
596        .extend(indices.iter().map(|&idx| PhysicalSortRequirement {
597            expr: Arc::clone(&partitionby_exprs[idx]),
598            options: None,
599        }));
600    // Treat partition by exprs as constant. During analysis of requirements are satisfied.
601    let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
602    let partition_by_eqs = input_eqs.with_constants(const_exprs);
603    let order_by_reqs = LexRequirement::from(orderby_keys.clone());
604    let reverse_order_by_reqs = LexRequirement::from(reverse_order_bys(orderby_keys));
605    for (should_swap, order_by_reqs) in
606        [(false, order_by_reqs), (true, reverse_order_by_reqs)]
607    {
608        let req = LexRequirement::new(
609            [partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(),
610        )
611        .collapse();
612        if partition_by_eqs.ordering_satisfy_requirement(&req) {
613            // Window can be run with existing ordering
614            let mode = if indices.len() == partitionby_exprs.len() {
615                InputOrderMode::Sorted
616            } else if indices.is_empty() {
617                InputOrderMode::Linear
618            } else {
619                InputOrderMode::PartiallySorted(indices)
620            };
621            return Some((should_swap, mode));
622        }
623    }
624    None
625}
626
627fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) -> Vec<PhysicalSortExpr> {
628    vec![
629        PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
630        PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
631    ]
632}
633
634#[cfg(test)]
635mod tests {
636    use super::*;
637    use crate::collect;
638    use crate::expressions::col;
639    use crate::streaming::StreamingTableExec;
640    use crate::test::assert_is_pending;
641    use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
642
643    use arrow::compute::SortOptions;
644    use datafusion_execution::TaskContext;
645
646    use datafusion_functions_aggregate::count::count_udaf;
647    use futures::FutureExt;
648    use InputOrderMode::{Linear, PartiallySorted, Sorted};
649
650    fn create_test_schema() -> Result<SchemaRef> {
651        let nullable_column = Field::new("nullable_col", DataType::Int32, true);
652        let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
653        let schema = Arc::new(Schema::new(vec![nullable_column, non_nullable_column]));
654
655        Ok(schema)
656    }
657
658    fn create_test_schema2() -> Result<SchemaRef> {
659        let a = Field::new("a", DataType::Int32, true);
660        let b = Field::new("b", DataType::Int32, true);
661        let c = Field::new("c", DataType::Int32, true);
662        let d = Field::new("d", DataType::Int32, true);
663        let e = Field::new("e", DataType::Int32, true);
664        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
665        Ok(schema)
666    }
667
668    // Generate a schema which consists of 5 columns (a, b, c, d, e)
669    fn create_test_schema3() -> Result<SchemaRef> {
670        let a = Field::new("a", DataType::Int32, true);
671        let b = Field::new("b", DataType::Int32, false);
672        let c = Field::new("c", DataType::Int32, true);
673        let d = Field::new("d", DataType::Int32, false);
674        let e = Field::new("e", DataType::Int32, false);
675        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
676        Ok(schema)
677    }
678
679    /// make PhysicalSortExpr with default options
680    pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
681        sort_expr_options(name, schema, SortOptions::default())
682    }
683
684    /// PhysicalSortExpr with specified options
685    pub fn sort_expr_options(
686        name: &str,
687        schema: &Schema,
688        options: SortOptions,
689    ) -> PhysicalSortExpr {
690        PhysicalSortExpr {
691            expr: col(name, schema).unwrap(),
692            options,
693        }
694    }
695
696    /// Created a sorted Streaming Table exec
697    pub fn streaming_table_exec(
698        schema: &SchemaRef,
699        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
700        infinite_source: bool,
701    ) -> Result<Arc<dyn ExecutionPlan>> {
702        let sort_exprs = sort_exprs.into_iter().collect();
703
704        Ok(Arc::new(StreamingTableExec::try_new(
705            Arc::clone(schema),
706            vec![],
707            None,
708            Some(sort_exprs),
709            infinite_source,
710            None,
711        )?))
712    }
713
714    #[tokio::test]
715    async fn test_calc_requirements() -> Result<()> {
716        let schema = create_test_schema2()?;
717        let test_data = vec![
718            // PARTITION BY a, ORDER BY b ASC NULLS FIRST
719            (
720                vec!["a"],
721                vec![("b", true, true)],
722                vec![("a", None), ("b", Some((true, true)))],
723            ),
724            // PARTITION BY a, ORDER BY a ASC NULLS FIRST
725            (vec!["a"], vec![("a", true, true)], vec![("a", None)]),
726            // PARTITION BY a, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST
727            (
728                vec!["a"],
729                vec![("b", true, true), ("c", false, false)],
730                vec![
731                    ("a", None),
732                    ("b", Some((true, true))),
733                    ("c", Some((false, false))),
734                ],
735            ),
736            // PARTITION BY a, c, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST
737            (
738                vec!["a", "c"],
739                vec![("b", true, true), ("c", false, false)],
740                vec![("a", None), ("c", None), ("b", Some((true, true)))],
741            ),
742        ];
743        for (pb_params, ob_params, expected_params) in test_data {
744            let mut partitionbys = vec![];
745            for col_name in pb_params {
746                partitionbys.push(col(col_name, &schema)?);
747            }
748
749            let mut orderbys = vec![];
750            for (col_name, descending, nulls_first) in ob_params {
751                let expr = col(col_name, &schema)?;
752                let options = SortOptions {
753                    descending,
754                    nulls_first,
755                };
756                orderbys.push(PhysicalSortExpr { expr, options });
757            }
758
759            let mut expected: Option<LexRequirement> = None;
760            for (col_name, reqs) in expected_params {
761                let options = reqs.map(|(descending, nulls_first)| SortOptions {
762                    descending,
763                    nulls_first,
764                });
765                let expr = col(col_name, &schema)?;
766                let res = PhysicalSortRequirement::new(expr, options);
767                if let Some(expected) = &mut expected {
768                    expected.push(res);
769                } else {
770                    expected = Some(LexRequirement::new(vec![res]));
771                }
772            }
773            assert_eq!(calc_requirements(partitionbys, orderbys), expected);
774        }
775        Ok(())
776    }
777
778    #[tokio::test]
779    async fn test_drop_cancel() -> Result<()> {
780        let task_ctx = Arc::new(TaskContext::default());
781        let schema =
782            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
783
784        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
785        let refs = blocking_exec.refs();
786        let window_agg_exec = Arc::new(WindowAggExec::try_new(
787            vec![create_window_expr(
788                &WindowFunctionDefinition::AggregateUDF(count_udaf()),
789                "count".to_owned(),
790                &[col("a", &schema)?],
791                &[],
792                &LexOrdering::default(),
793                Arc::new(WindowFrame::new(None)),
794                schema.as_ref(),
795                false,
796            )?],
797            blocking_exec,
798            false,
799        )?);
800
801        let fut = collect(window_agg_exec, task_ctx);
802        let mut fut = fut.boxed();
803
804        assert_is_pending(&mut fut);
805        drop(fut);
806        assert_strong_count_converges_to_zero(refs).await;
807
808        Ok(())
809    }
810
811    #[tokio::test]
812    async fn test_satisfy_nullable() -> Result<()> {
813        let schema = create_test_schema()?;
814        let params = vec![
815            ((true, true), (false, false), false),
816            ((true, true), (false, true), false),
817            ((true, true), (true, false), false),
818            ((true, false), (false, true), false),
819            ((true, false), (false, false), false),
820            ((true, false), (true, true), false),
821            ((true, false), (true, false), true),
822        ];
823        for (
824            (physical_desc, physical_nulls_first),
825            (req_desc, req_nulls_first),
826            expected,
827        ) in params
828        {
829            let physical_ordering = PhysicalSortExpr {
830                expr: col("nullable_col", &schema)?,
831                options: SortOptions {
832                    descending: physical_desc,
833                    nulls_first: physical_nulls_first,
834                },
835            };
836            let required_ordering = PhysicalSortExpr {
837                expr: col("nullable_col", &schema)?,
838                options: SortOptions {
839                    descending: req_desc,
840                    nulls_first: req_nulls_first,
841                },
842            };
843            let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
844            assert_eq!(res, expected);
845        }
846
847        Ok(())
848    }
849
850    #[tokio::test]
851    async fn test_satisfy_non_nullable() -> Result<()> {
852        let schema = create_test_schema()?;
853
854        let params = vec![
855            ((true, true), (false, false), false),
856            ((true, true), (false, true), false),
857            ((true, true), (true, false), true),
858            ((true, false), (false, true), false),
859            ((true, false), (false, false), false),
860            ((true, false), (true, true), true),
861            ((true, false), (true, false), true),
862        ];
863        for (
864            (physical_desc, physical_nulls_first),
865            (req_desc, req_nulls_first),
866            expected,
867        ) in params
868        {
869            let physical_ordering = PhysicalSortExpr {
870                expr: col("non_nullable_col", &schema)?,
871                options: SortOptions {
872                    descending: physical_desc,
873                    nulls_first: physical_nulls_first,
874                },
875            };
876            let required_ordering = PhysicalSortExpr {
877                expr: col("non_nullable_col", &schema)?,
878                options: SortOptions {
879                    descending: req_desc,
880                    nulls_first: req_nulls_first,
881                },
882            };
883            let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
884            assert_eq!(res, expected);
885        }
886
887        Ok(())
888    }
889
890    #[tokio::test]
891    async fn test_get_window_mode_exhaustive() -> Result<()> {
892        let test_schema = create_test_schema3()?;
893        // Columns a,c are nullable whereas b,d are not nullable.
894        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST, d ASC NULLS FIRST
895        // Column e is not ordered.
896        let sort_exprs = vec![
897            sort_expr("a", &test_schema),
898            sort_expr("b", &test_schema),
899            sort_expr("c", &test_schema),
900            sort_expr("d", &test_schema),
901        ];
902        let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, true)?;
903
904        // test cases consists of vector of tuples. Where each tuple represents a single test case.
905        // First field in the tuple is Vec<str> where each element in the vector represents PARTITION BY columns
906        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
907        // Second field in the tuple is Vec<str> where each element in the vector represents ORDER BY columns
908        // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST, (ordering is default ordering. We do not check
909        // for reversibility in this test).
910        // Third field in the tuple is Option<InputOrderMode>, which corresponds to expected algorithm mode.
911        // None represents that existing ordering is not sufficient to run executor with any one of the algorithms
912        // (We need to add SortExec to be able to run it).
913        // Some(InputOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in
914        // InputOrderMode.
915        let test_cases = vec![
916            (vec!["a"], vec!["a"], Some(Sorted)),
917            (vec!["a"], vec!["b"], Some(Sorted)),
918            (vec!["a"], vec!["c"], None),
919            (vec!["a"], vec!["a", "b"], Some(Sorted)),
920            (vec!["a"], vec!["b", "c"], Some(Sorted)),
921            (vec!["a"], vec!["a", "c"], None),
922            (vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
923            (vec!["b"], vec!["a"], Some(Linear)),
924            (vec!["b"], vec!["b"], Some(Linear)),
925            (vec!["b"], vec!["c"], None),
926            (vec!["b"], vec!["a", "b"], Some(Linear)),
927            (vec!["b"], vec!["b", "c"], None),
928            (vec!["b"], vec!["a", "c"], Some(Linear)),
929            (vec!["b"], vec!["a", "b", "c"], Some(Linear)),
930            (vec!["c"], vec!["a"], Some(Linear)),
931            (vec!["c"], vec!["b"], None),
932            (vec!["c"], vec!["c"], Some(Linear)),
933            (vec!["c"], vec!["a", "b"], Some(Linear)),
934            (vec!["c"], vec!["b", "c"], None),
935            (vec!["c"], vec!["a", "c"], Some(Linear)),
936            (vec!["c"], vec!["a", "b", "c"], Some(Linear)),
937            (vec!["b", "a"], vec!["a"], Some(Sorted)),
938            (vec!["b", "a"], vec!["b"], Some(Sorted)),
939            (vec!["b", "a"], vec!["c"], Some(Sorted)),
940            (vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
941            (vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
942            (vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
943            (vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
944            (vec!["c", "b"], vec!["a"], Some(Linear)),
945            (vec!["c", "b"], vec!["b"], Some(Linear)),
946            (vec!["c", "b"], vec!["c"], Some(Linear)),
947            (vec!["c", "b"], vec!["a", "b"], Some(Linear)),
948            (vec!["c", "b"], vec!["b", "c"], Some(Linear)),
949            (vec!["c", "b"], vec!["a", "c"], Some(Linear)),
950            (vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
951            (vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
952            (vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
953            (vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
954            (
955                vec!["c", "a"],
956                vec!["a", "b"],
957                Some(PartiallySorted(vec![1])),
958            ),
959            (
960                vec!["c", "a"],
961                vec!["b", "c"],
962                Some(PartiallySorted(vec![1])),
963            ),
964            (
965                vec!["c", "a"],
966                vec!["a", "c"],
967                Some(PartiallySorted(vec![1])),
968            ),
969            (
970                vec!["c", "a"],
971                vec!["a", "b", "c"],
972                Some(PartiallySorted(vec![1])),
973            ),
974            (vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
975            (vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
976            (vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
977            (vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
978            (vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
979            (vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
980            (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
981        ];
982        for (case_idx, test_case) in test_cases.iter().enumerate() {
983            let (partition_by_columns, order_by_params, expected) = &test_case;
984            let mut partition_by_exprs = vec![];
985            for col_name in partition_by_columns {
986                partition_by_exprs.push(col(col_name, &test_schema)?);
987            }
988
989            let mut order_by_exprs = LexOrdering::default();
990            for col_name in order_by_params {
991                let expr = col(col_name, &test_schema)?;
992                // Give default ordering, this is same with input ordering direction
993                // In this test we do check for reversibility.
994                let options = SortOptions::default();
995                order_by_exprs.push(PhysicalSortExpr { expr, options });
996            }
997            let res = get_window_mode(
998                &partition_by_exprs,
999                order_by_exprs.as_ref(),
1000                &exec_unbounded,
1001            );
1002            // Since reversibility is not important in this test. Convert Option<(bool, InputOrderMode)> to Option<InputOrderMode>
1003            let res = res.map(|(_, mode)| mode);
1004            assert_eq!(
1005                res, *expected,
1006                "Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
1007            );
1008        }
1009
1010        Ok(())
1011    }
1012
1013    #[tokio::test]
1014    async fn test_get_window_mode() -> Result<()> {
1015        let test_schema = create_test_schema3()?;
1016        // Columns a,c are nullable whereas b,d are not nullable.
1017        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST, d ASC NULLS FIRST
1018        // Column e is not ordered.
1019        let sort_exprs = vec![
1020            sort_expr("a", &test_schema),
1021            sort_expr("b", &test_schema),
1022            sort_expr("c", &test_schema),
1023            sort_expr("d", &test_schema),
1024        ];
1025        let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, true)?;
1026
1027        // test cases consists of vector of tuples. Where each tuple represents a single test case.
1028        // First field in the tuple is Vec<str> where each element in the vector represents PARTITION BY columns
1029        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
1030        // Second field in the tuple is Vec<(str, bool, bool)> where each element in the vector represents ORDER BY columns
1031        // For instance, vec![("c", false, false)], corresponds to ORDER BY c ASC NULLS LAST,
1032        // similarly, vec![("c", true, true)], corresponds to ORDER BY c DESC NULLS FIRST,
1033        // Third field in the tuple is Option<(bool, InputOrderMode)>, which corresponds to expected result.
1034        // None represents that existing ordering is not sufficient to run executor with any one of the algorithms
1035        // (We need to add SortExec to be able to run it).
1036        // Some((bool, InputOrderMode)) represents, we can run algorithm with existing ordering. Algorithm should work in
1037        // InputOrderMode, bool field represents whether we should reverse window expressions to run executor with existing ordering.
1038        // For instance, `Some((false, InputOrderMode::Sorted))`, represents that we shouldn't reverse window expressions. And algorithm
1039        // should work in Sorted mode to work with existing ordering.
1040        let test_cases = vec![
1041            // PARTITION BY a, b ORDER BY c ASC NULLS LAST
1042            (vec!["a", "b"], vec![("c", false, false)], None),
1043            // ORDER BY c ASC NULLS FIRST
1044            (vec![], vec![("c", false, true)], None),
1045            // PARTITION BY b, ORDER BY c ASC NULLS FIRST
1046            (vec!["b"], vec![("c", false, true)], None),
1047            // PARTITION BY a, ORDER BY c ASC NULLS FIRST
1048            (vec!["a"], vec![("c", false, true)], None),
1049            // PARTITION BY b, ORDER BY c ASC NULLS FIRST
1050            (
1051                vec!["a", "b"],
1052                vec![("c", false, true), ("e", false, true)],
1053                None,
1054            ),
1055            // PARTITION BY a, ORDER BY b ASC NULLS FIRST
1056            (vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
1057            // PARTITION BY a, ORDER BY a ASC NULLS FIRST
1058            (vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
1059            // PARTITION BY a, ORDER BY a ASC NULLS LAST
1060            (vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
1061            // PARTITION BY a, ORDER BY a DESC NULLS FIRST
1062            (vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
1063            // PARTITION BY a, ORDER BY a DESC NULLS LAST
1064            (vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
1065            // PARTITION BY a, ORDER BY b ASC NULLS LAST
1066            (vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
1067            // PARTITION BY a, ORDER BY b DESC NULLS LAST
1068            (vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
1069            // PARTITION BY a, b ORDER BY c ASC NULLS FIRST
1070            (
1071                vec!["a", "b"],
1072                vec![("c", false, true)],
1073                Some((false, Sorted)),
1074            ),
1075            // PARTITION BY b, a ORDER BY c ASC NULLS FIRST
1076            (
1077                vec!["b", "a"],
1078                vec![("c", false, true)],
1079                Some((false, Sorted)),
1080            ),
1081            // PARTITION BY a, b ORDER BY c DESC NULLS LAST
1082            (
1083                vec!["a", "b"],
1084                vec![("c", true, false)],
1085                Some((true, Sorted)),
1086            ),
1087            // PARTITION BY e ORDER BY a ASC NULLS FIRST
1088            (
1089                vec!["e"],
1090                vec![("a", false, true)],
1091                // For unbounded, expects to work in Linear mode. Shouldn't reverse window function.
1092                Some((false, Linear)),
1093            ),
1094            // PARTITION BY b, c ORDER BY a ASC NULLS FIRST, c ASC NULLS FIRST
1095            (
1096                vec!["b", "c"],
1097                vec![("a", false, true), ("c", false, true)],
1098                Some((false, Linear)),
1099            ),
1100            // PARTITION BY b ORDER BY a ASC NULLS FIRST
1101            (vec!["b"], vec![("a", false, true)], Some((false, Linear))),
1102            // PARTITION BY a, e ORDER BY b ASC NULLS FIRST
1103            (
1104                vec!["a", "e"],
1105                vec![("b", false, true)],
1106                Some((false, PartiallySorted(vec![0]))),
1107            ),
1108            // PARTITION BY a, c ORDER BY b ASC NULLS FIRST
1109            (
1110                vec!["a", "c"],
1111                vec![("b", false, true)],
1112                Some((false, PartiallySorted(vec![0]))),
1113            ),
1114            // PARTITION BY c, a ORDER BY b ASC NULLS FIRST
1115            (
1116                vec!["c", "a"],
1117                vec![("b", false, true)],
1118                Some((false, PartiallySorted(vec![1]))),
1119            ),
1120            // PARTITION BY d, b, a ORDER BY c ASC NULLS FIRST
1121            (
1122                vec!["d", "b", "a"],
1123                vec![("c", false, true)],
1124                Some((false, PartiallySorted(vec![2, 1]))),
1125            ),
1126            // PARTITION BY e, b, a ORDER BY c ASC NULLS FIRST
1127            (
1128                vec!["e", "b", "a"],
1129                vec![("c", false, true)],
1130                Some((false, PartiallySorted(vec![2, 1]))),
1131            ),
1132            // PARTITION BY d, a ORDER BY b ASC NULLS FIRST
1133            (
1134                vec!["d", "a"],
1135                vec![("b", false, true)],
1136                Some((false, PartiallySorted(vec![1]))),
1137            ),
1138            // PARTITION BY b, ORDER BY b, a ASC NULLS FIRST
1139            (
1140                vec!["a"],
1141                vec![("b", false, true), ("a", false, true)],
1142                Some((false, Sorted)),
1143            ),
1144            // ORDER BY b, a ASC NULLS FIRST
1145            (vec![], vec![("b", false, true), ("a", false, true)], None),
1146        ];
1147        for (case_idx, test_case) in test_cases.iter().enumerate() {
1148            let (partition_by_columns, order_by_params, expected) = &test_case;
1149            let mut partition_by_exprs = vec![];
1150            for col_name in partition_by_columns {
1151                partition_by_exprs.push(col(col_name, &test_schema)?);
1152            }
1153
1154            let mut order_by_exprs = LexOrdering::default();
1155            for (col_name, descending, nulls_first) in order_by_params {
1156                let expr = col(col_name, &test_schema)?;
1157                let options = SortOptions {
1158                    descending: *descending,
1159                    nulls_first: *nulls_first,
1160                };
1161                order_by_exprs.push(PhysicalSortExpr { expr, options });
1162            }
1163
1164            assert_eq!(
1165                get_window_mode(&partition_by_exprs, order_by_exprs.as_ref(), &exec_unbounded),
1166                *expected,
1167                "Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
1168            );
1169        }
1170
1171        Ok(())
1172    }
1173}