1mod bounded_window_agg_exec;
21mod utils;
22mod window_agg_exec;
23
24use std::borrow::Borrow;
25use std::iter;
26use std::sync::Arc;
27
28use crate::{
29 expressions::PhysicalSortExpr, ExecutionPlan, ExecutionPlanProperties,
30 InputOrderMode, PhysicalExpr,
31};
32
33use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
34use arrow_schema::SortOptions;
35use datafusion_common::{exec_err, Result};
36use datafusion_expr::{
37 PartitionEvaluator, ReversedUDWF, SetMonotonicity, WindowFrame,
38 WindowFunctionDefinition, WindowUDF,
39};
40use datafusion_functions_window_common::expr::ExpressionArgs;
41use datafusion_functions_window_common::field::WindowUDFFieldArgs;
42use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
43use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
44use datafusion_physical_expr::expressions::Column;
45use datafusion_physical_expr::{
46 reverse_order_bys,
47 window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr},
48 ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement,
49};
50use datafusion_physical_expr_common::sort_expr::LexRequirement;
51
52use itertools::Itertools;
53
54pub use bounded_window_agg_exec::BoundedWindowAggExec;
56pub use datafusion_physical_expr::window::{
57 PlainAggregateWindowExpr, StandardWindowExpr, WindowExpr,
58};
59pub use window_agg_exec::WindowAggExec;
60
61pub fn schema_add_window_field(
63 args: &[Arc<dyn PhysicalExpr>],
64 schema: &Schema,
65 window_fn: &WindowFunctionDefinition,
66 fn_name: &str,
67) -> Result<Arc<Schema>> {
68 let data_types = args
69 .iter()
70 .map(|e| Arc::clone(e).as_ref().data_type(schema))
71 .collect::<Result<Vec<_>>>()?;
72 let nullability = args
73 .iter()
74 .map(|e| Arc::clone(e).as_ref().nullable(schema))
75 .collect::<Result<Vec<_>>>()?;
76 let window_expr_return_type =
77 window_fn.return_type(&data_types, &nullability, fn_name)?;
78 let mut window_fields = schema
79 .fields()
80 .iter()
81 .map(|f| f.as_ref().clone())
82 .collect_vec();
83 if let WindowFunctionDefinition::AggregateUDF(_) = window_fn {
85 Ok(Arc::new(Schema::new(window_fields)))
86 } else {
87 window_fields.extend_from_slice(&[Field::new(
88 fn_name,
89 window_expr_return_type,
90 false,
91 )]);
92 Ok(Arc::new(Schema::new(window_fields)))
93 }
94}
95
96#[allow(clippy::too_many_arguments)]
98pub fn create_window_expr(
99 fun: &WindowFunctionDefinition,
100 name: String,
101 args: &[Arc<dyn PhysicalExpr>],
102 partition_by: &[Arc<dyn PhysicalExpr>],
103 order_by: &LexOrdering,
104 window_frame: Arc<WindowFrame>,
105 input_schema: &Schema,
106 ignore_nulls: bool,
107) -> Result<Arc<dyn WindowExpr>> {
108 Ok(match fun {
109 WindowFunctionDefinition::AggregateUDF(fun) => {
110 let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
111 .schema(Arc::new(input_schema.clone()))
112 .alias(name)
113 .with_ignore_nulls(ignore_nulls)
114 .build()
115 .map(Arc::new)?;
116 window_expr_from_aggregate_expr(
117 partition_by,
118 order_by,
119 window_frame,
120 aggregate,
121 )
122 }
123 WindowFunctionDefinition::WindowUDF(fun) => Arc::new(StandardWindowExpr::new(
124 create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?,
125 partition_by,
126 order_by,
127 window_frame,
128 )),
129 })
130}
131
132fn window_expr_from_aggregate_expr(
134 partition_by: &[Arc<dyn PhysicalExpr>],
135 order_by: &LexOrdering,
136 window_frame: Arc<WindowFrame>,
137 aggregate: Arc<AggregateFunctionExpr>,
138) -> Arc<dyn WindowExpr> {
139 let unbounded_window = window_frame.is_ever_expanding();
141
142 if !unbounded_window {
143 Arc::new(SlidingAggregateWindowExpr::new(
144 aggregate,
145 partition_by,
146 order_by,
147 window_frame,
148 ))
149 } else {
150 Arc::new(PlainAggregateWindowExpr::new(
151 aggregate,
152 partition_by,
153 order_by,
154 window_frame,
155 ))
156 }
157}
158
159pub fn create_udwf_window_expr(
161 fun: &Arc<WindowUDF>,
162 args: &[Arc<dyn PhysicalExpr>],
163 input_schema: &Schema,
164 name: String,
165 ignore_nulls: bool,
166) -> Result<Arc<dyn StandardWindowFunctionExpr>> {
167 let input_types: Vec<_> = args
169 .iter()
170 .map(|arg| arg.data_type(input_schema))
171 .collect::<Result<_>>()?;
172
173 let udwf_expr = Arc::new(WindowUDFExpr {
174 fun: Arc::clone(fun),
175 args: args.to_vec(),
176 input_types,
177 name,
178 is_reversed: false,
179 ignore_nulls,
180 });
181
182 let _ = udwf_expr.create_evaluator()?;
194
195 Ok(udwf_expr)
196}
197
198#[derive(Clone, Debug)]
200pub struct WindowUDFExpr {
201 fun: Arc<WindowUDF>,
202 args: Vec<Arc<dyn PhysicalExpr>>,
203 name: String,
205 input_types: Vec<DataType>,
207 is_reversed: bool,
211 ignore_nulls: bool,
213}
214
215impl WindowUDFExpr {
216 pub fn fun(&self) -> &Arc<WindowUDF> {
217 &self.fun
218 }
219}
220
221impl StandardWindowFunctionExpr for WindowUDFExpr {
222 fn as_any(&self) -> &dyn std::any::Any {
223 self
224 }
225
226 fn field(&self) -> Result<Field> {
227 self.fun
228 .field(WindowUDFFieldArgs::new(&self.input_types, &self.name))
229 }
230
231 fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
232 self.fun
233 .expressions(ExpressionArgs::new(&self.args, &self.input_types))
234 }
235
236 fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
237 self.fun
238 .partition_evaluator_factory(PartitionEvaluatorArgs::new(
239 &self.args,
240 &self.input_types,
241 self.is_reversed,
242 self.ignore_nulls,
243 ))
244 }
245
246 fn name(&self) -> &str {
247 &self.name
248 }
249
250 fn reverse_expr(&self) -> Option<Arc<dyn StandardWindowFunctionExpr>> {
251 match self.fun.reverse_expr() {
252 ReversedUDWF::Identical => Some(Arc::new(self.clone())),
253 ReversedUDWF::NotSupported => None,
254 ReversedUDWF::Reversed(fun) => Some(Arc::new(WindowUDFExpr {
255 fun,
256 args: self.args.clone(),
257 name: self.name.clone(),
258 input_types: self.input_types.clone(),
259 is_reversed: !self.is_reversed,
260 ignore_nulls: self.ignore_nulls,
261 })),
262 }
263 }
264
265 fn get_result_ordering(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
266 self.fun
267 .sort_options()
268 .zip(schema.column_with_name(self.name()))
269 .map(|(options, (idx, field))| {
270 let expr = Arc::new(Column::new(field.name(), idx));
271 PhysicalSortExpr { expr, options }
272 })
273 }
274}
275
276pub(crate) fn calc_requirements<
277 T: Borrow<Arc<dyn PhysicalExpr>>,
278 S: Borrow<PhysicalSortExpr>,
279>(
280 partition_by_exprs: impl IntoIterator<Item = T>,
281 orderby_sort_exprs: impl IntoIterator<Item = S>,
282) -> Option<LexRequirement> {
283 let mut sort_reqs = LexRequirement::new(
284 partition_by_exprs
285 .into_iter()
286 .map(|partition_by| {
287 PhysicalSortRequirement::new(Arc::clone(partition_by.borrow()), None)
288 })
289 .collect::<Vec<_>>(),
290 );
291 for element in orderby_sort_exprs.into_iter() {
292 let PhysicalSortExpr { expr, options } = element.borrow();
293 if !sort_reqs.iter().any(|e| e.expr.eq(expr)) {
294 sort_reqs.push(PhysicalSortRequirement::new(
295 Arc::clone(expr),
296 Some(*options),
297 ));
298 }
299 }
300 (!sort_reqs.is_empty()).then_some(sort_reqs)
302}
303
304pub fn get_ordered_partition_by_indices(
310 partition_by_exprs: &[Arc<dyn PhysicalExpr>],
311 input: &Arc<dyn ExecutionPlan>,
312) -> Vec<usize> {
313 let (_, indices) = input
314 .equivalence_properties()
315 .find_longest_permutation(partition_by_exprs);
316 indices
317}
318
319pub(crate) fn get_partition_by_sort_exprs(
320 input: &Arc<dyn ExecutionPlan>,
321 partition_by_exprs: &[Arc<dyn PhysicalExpr>],
322 ordered_partition_by_indices: &[usize],
323) -> Result<LexOrdering> {
324 let ordered_partition_exprs = ordered_partition_by_indices
325 .iter()
326 .map(|idx| Arc::clone(&partition_by_exprs[*idx]))
327 .collect::<Vec<_>>();
328 assert!(ordered_partition_by_indices.len() <= partition_by_exprs.len());
330 let (ordering, _) = input
331 .equivalence_properties()
332 .find_longest_permutation(&ordered_partition_exprs);
333 if ordering.len() == ordered_partition_exprs.len() {
334 Ok(ordering)
335 } else {
336 exec_err!("Expects PARTITION BY expression to be ordered")
337 }
338}
339
340pub(crate) fn window_equivalence_properties(
341 schema: &SchemaRef,
342 input: &Arc<dyn ExecutionPlan>,
343 window_exprs: &[Arc<dyn WindowExpr>],
344) -> EquivalenceProperties {
345 let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
348 .extend(input.equivalence_properties().clone());
349
350 let window_schema_len = schema.fields.len();
351 let input_schema_len = window_schema_len - window_exprs.len();
352 let window_expr_indices = (input_schema_len..window_schema_len).collect::<Vec<_>>();
353
354 for (i, expr) in window_exprs.iter().enumerate() {
355 let partitioning_exprs = expr.partition_by();
356 let no_partitioning = partitioning_exprs.is_empty();
357 let partition_by_orders = partitioning_exprs
361 .iter()
362 .map(|pb_order| sort_options_resolving_constant(Arc::clone(pb_order)));
363 let all_satisfied_lexs = partition_by_orders
364 .multi_cartesian_product()
365 .map(LexOrdering::new)
366 .filter(|lex| window_eq_properties.ordering_satisfy(lex))
367 .collect::<Vec<_>>();
368 if !no_partitioning && all_satisfied_lexs.is_empty() {
372 return window_eq_properties;
373 } else if let Some(std_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
374 {
375 std_expr.add_equal_orderings(&mut window_eq_properties);
376 } else if let Some(plain_expr) =
377 expr.as_any().downcast_ref::<PlainAggregateWindowExpr>()
378 {
379 if plain_expr.get_window_frame().end_bound.is_unbounded() {
383 let window_col = Column::new(expr.name(), i + input_schema_len);
384 if no_partitioning {
385 window_eq_properties = window_eq_properties
387 .with_constants(iter::once(ConstExpr::new(Arc::new(window_col))))
388 } else {
389 let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| {
392 let orderings = lex.take_exprs();
393 let new_partial_consts =
394 sort_options_resolving_constant(Arc::new(window_col.clone()));
395
396 new_partial_consts.into_iter().map(move |partial| {
397 let mut existing = orderings.clone();
398 existing.push(partial);
399 LexOrdering::new(existing)
400 })
401 });
402 window_eq_properties.add_new_orderings(new_lexs);
403 }
404 } else {
405 plain_expr.add_equal_orderings(
408 &mut window_eq_properties,
409 window_expr_indices[i],
410 );
411 }
412 } else if let Some(sliding_expr) =
413 expr.as_any().downcast_ref::<SlidingAggregateWindowExpr>()
414 {
415 let set_monotonicity = sliding_expr.get_aggregate_expr().set_monotonicity();
419 if set_monotonicity.ne(&SetMonotonicity::NotMonotonic) {
420 let frame = sliding_expr.get_window_frame();
423 if frame.end_bound.is_unbounded() {
424 let increasing = set_monotonicity.eq(&SetMonotonicity::Increasing);
425 let window_col = Column::new(expr.name(), i + input_schema_len);
426 if no_partitioning {
427 let new_ordering =
429 vec![LexOrdering::new(vec![PhysicalSortExpr::new(
430 Arc::new(window_col),
431 SortOptions::new(increasing, true),
432 )])];
433 window_eq_properties.add_new_orderings(new_ordering);
434 } else {
435 for lex in all_satisfied_lexs.into_iter() {
437 let mut existing = lex.take_exprs();
438 existing.push(PhysicalSortExpr::new(
439 Arc::new(window_col.clone()),
440 SortOptions::new(increasing, true),
441 ));
442 window_eq_properties
443 .add_new_ordering(LexOrdering::new(existing));
444 }
445 }
446 }
447 else if frame.is_causal() {
454 let mut args_all_lexs = sliding_expr
455 .get_aggregate_expr()
456 .expressions()
457 .into_iter()
458 .map(sort_options_resolving_constant)
459 .multi_cartesian_product();
460
461 let mut asc = false;
462 if args_all_lexs.any(|order| {
463 if let Some(f) = order.first() {
464 asc = !f.options.descending;
465 }
466 window_eq_properties.ordering_satisfy(&LexOrdering::new(order))
467 }) {
468 let increasing =
469 set_monotonicity.eq(&SetMonotonicity::Increasing);
470 let window_col = Column::new(expr.name(), i + input_schema_len);
471 if increasing && (asc || no_partitioning) {
472 let new_ordering =
473 LexOrdering::new(vec![PhysicalSortExpr::new(
474 Arc::new(window_col),
475 SortOptions::new(false, false),
476 )]);
477 window_eq_properties.add_new_ordering(new_ordering);
478 } else if !increasing && (!asc || no_partitioning) {
479 let new_ordering =
480 LexOrdering::new(vec![PhysicalSortExpr::new(
481 Arc::new(window_col),
482 SortOptions::new(true, false),
483 )]);
484 window_eq_properties.add_new_ordering(new_ordering);
485 };
486 }
487 }
488 }
489 }
490 }
491 window_eq_properties
492}
493
494pub fn get_best_fitting_window(
505 window_exprs: &[Arc<dyn WindowExpr>],
506 input: &Arc<dyn ExecutionPlan>,
507 physical_partition_keys: &[Arc<dyn PhysicalExpr>],
511) -> Result<Option<Arc<dyn ExecutionPlan>>> {
512 let partitionby_exprs = window_exprs[0].partition_by();
515 let orderby_keys = window_exprs[0].order_by();
516 let (should_reverse, input_order_mode) =
517 if let Some((should_reverse, input_order_mode)) =
518 get_window_mode(partitionby_exprs, orderby_keys, input)
519 {
520 (should_reverse, input_order_mode)
521 } else {
522 return Ok(None);
523 };
524 let is_unbounded = input.boundedness().is_unbounded();
525 if !is_unbounded && input_order_mode != InputOrderMode::Sorted {
526 return Ok(None);
529 };
530
531 let window_expr = if should_reverse {
532 if let Some(reversed_window_expr) = window_exprs
533 .iter()
534 .map(|e| e.get_reverse_expr())
535 .collect::<Option<Vec<_>>>()
536 {
537 reversed_window_expr
538 } else {
539 return Ok(None);
542 }
543 } else {
544 window_exprs.to_vec()
545 };
546
547 if window_expr.iter().all(|e| e.uses_bounded_memory()) {
550 Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
551 window_expr,
552 Arc::clone(input),
553 input_order_mode,
554 !physical_partition_keys.is_empty(),
555 )?) as _))
556 } else if input_order_mode != InputOrderMode::Sorted {
557 Ok(None)
562 } else {
563 Ok(Some(Arc::new(WindowAggExec::try_new(
564 window_expr,
565 Arc::clone(input),
566 !physical_partition_keys.is_empty(),
567 )?) as _))
568 }
569}
570
571pub fn get_window_mode(
583 partitionby_exprs: &[Arc<dyn PhysicalExpr>],
584 orderby_keys: &LexOrdering,
585 input: &Arc<dyn ExecutionPlan>,
586) -> Option<(bool, InputOrderMode)> {
587 let input_eqs = input.equivalence_properties().clone();
588 let mut partition_by_reqs: LexRequirement = LexRequirement::new(vec![]);
589 let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs);
590 vec![].extend(indices.iter().map(|&idx| PhysicalSortRequirement {
591 expr: Arc::clone(&partitionby_exprs[idx]),
592 options: None,
593 }));
594 partition_by_reqs
595 .inner
596 .extend(indices.iter().map(|&idx| PhysicalSortRequirement {
597 expr: Arc::clone(&partitionby_exprs[idx]),
598 options: None,
599 }));
600 let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
602 let partition_by_eqs = input_eqs.with_constants(const_exprs);
603 let order_by_reqs = LexRequirement::from(orderby_keys.clone());
604 let reverse_order_by_reqs = LexRequirement::from(reverse_order_bys(orderby_keys));
605 for (should_swap, order_by_reqs) in
606 [(false, order_by_reqs), (true, reverse_order_by_reqs)]
607 {
608 let req = LexRequirement::new(
609 [partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(),
610 )
611 .collapse();
612 if partition_by_eqs.ordering_satisfy_requirement(&req) {
613 let mode = if indices.len() == partitionby_exprs.len() {
615 InputOrderMode::Sorted
616 } else if indices.is_empty() {
617 InputOrderMode::Linear
618 } else {
619 InputOrderMode::PartiallySorted(indices)
620 };
621 return Some((should_swap, mode));
622 }
623 }
624 None
625}
626
627fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) -> Vec<PhysicalSortExpr> {
628 vec![
629 PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
630 PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
631 ]
632}
633
634#[cfg(test)]
635mod tests {
636 use super::*;
637 use crate::collect;
638 use crate::expressions::col;
639 use crate::streaming::StreamingTableExec;
640 use crate::test::assert_is_pending;
641 use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
642
643 use arrow::compute::SortOptions;
644 use datafusion_execution::TaskContext;
645
646 use datafusion_functions_aggregate::count::count_udaf;
647 use futures::FutureExt;
648 use InputOrderMode::{Linear, PartiallySorted, Sorted};
649
650 fn create_test_schema() -> Result<SchemaRef> {
651 let nullable_column = Field::new("nullable_col", DataType::Int32, true);
652 let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
653 let schema = Arc::new(Schema::new(vec![nullable_column, non_nullable_column]));
654
655 Ok(schema)
656 }
657
658 fn create_test_schema2() -> Result<SchemaRef> {
659 let a = Field::new("a", DataType::Int32, true);
660 let b = Field::new("b", DataType::Int32, true);
661 let c = Field::new("c", DataType::Int32, true);
662 let d = Field::new("d", DataType::Int32, true);
663 let e = Field::new("e", DataType::Int32, true);
664 let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
665 Ok(schema)
666 }
667
668 fn create_test_schema3() -> Result<SchemaRef> {
670 let a = Field::new("a", DataType::Int32, true);
671 let b = Field::new("b", DataType::Int32, false);
672 let c = Field::new("c", DataType::Int32, true);
673 let d = Field::new("d", DataType::Int32, false);
674 let e = Field::new("e", DataType::Int32, false);
675 let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
676 Ok(schema)
677 }
678
679 pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
681 sort_expr_options(name, schema, SortOptions::default())
682 }
683
684 pub fn sort_expr_options(
686 name: &str,
687 schema: &Schema,
688 options: SortOptions,
689 ) -> PhysicalSortExpr {
690 PhysicalSortExpr {
691 expr: col(name, schema).unwrap(),
692 options,
693 }
694 }
695
696 pub fn streaming_table_exec(
698 schema: &SchemaRef,
699 sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
700 infinite_source: bool,
701 ) -> Result<Arc<dyn ExecutionPlan>> {
702 let sort_exprs = sort_exprs.into_iter().collect();
703
704 Ok(Arc::new(StreamingTableExec::try_new(
705 Arc::clone(schema),
706 vec![],
707 None,
708 Some(sort_exprs),
709 infinite_source,
710 None,
711 )?))
712 }
713
714 #[tokio::test]
715 async fn test_calc_requirements() -> Result<()> {
716 let schema = create_test_schema2()?;
717 let test_data = vec![
718 (
720 vec!["a"],
721 vec![("b", true, true)],
722 vec![("a", None), ("b", Some((true, true)))],
723 ),
724 (vec!["a"], vec![("a", true, true)], vec![("a", None)]),
726 (
728 vec!["a"],
729 vec![("b", true, true), ("c", false, false)],
730 vec![
731 ("a", None),
732 ("b", Some((true, true))),
733 ("c", Some((false, false))),
734 ],
735 ),
736 (
738 vec!["a", "c"],
739 vec![("b", true, true), ("c", false, false)],
740 vec![("a", None), ("c", None), ("b", Some((true, true)))],
741 ),
742 ];
743 for (pb_params, ob_params, expected_params) in test_data {
744 let mut partitionbys = vec![];
745 for col_name in pb_params {
746 partitionbys.push(col(col_name, &schema)?);
747 }
748
749 let mut orderbys = vec![];
750 for (col_name, descending, nulls_first) in ob_params {
751 let expr = col(col_name, &schema)?;
752 let options = SortOptions {
753 descending,
754 nulls_first,
755 };
756 orderbys.push(PhysicalSortExpr { expr, options });
757 }
758
759 let mut expected: Option<LexRequirement> = None;
760 for (col_name, reqs) in expected_params {
761 let options = reqs.map(|(descending, nulls_first)| SortOptions {
762 descending,
763 nulls_first,
764 });
765 let expr = col(col_name, &schema)?;
766 let res = PhysicalSortRequirement::new(expr, options);
767 if let Some(expected) = &mut expected {
768 expected.push(res);
769 } else {
770 expected = Some(LexRequirement::new(vec![res]));
771 }
772 }
773 assert_eq!(calc_requirements(partitionbys, orderbys), expected);
774 }
775 Ok(())
776 }
777
778 #[tokio::test]
779 async fn test_drop_cancel() -> Result<()> {
780 let task_ctx = Arc::new(TaskContext::default());
781 let schema =
782 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
783
784 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
785 let refs = blocking_exec.refs();
786 let window_agg_exec = Arc::new(WindowAggExec::try_new(
787 vec![create_window_expr(
788 &WindowFunctionDefinition::AggregateUDF(count_udaf()),
789 "count".to_owned(),
790 &[col("a", &schema)?],
791 &[],
792 &LexOrdering::default(),
793 Arc::new(WindowFrame::new(None)),
794 schema.as_ref(),
795 false,
796 )?],
797 blocking_exec,
798 false,
799 )?);
800
801 let fut = collect(window_agg_exec, task_ctx);
802 let mut fut = fut.boxed();
803
804 assert_is_pending(&mut fut);
805 drop(fut);
806 assert_strong_count_converges_to_zero(refs).await;
807
808 Ok(())
809 }
810
811 #[tokio::test]
812 async fn test_satisfy_nullable() -> Result<()> {
813 let schema = create_test_schema()?;
814 let params = vec![
815 ((true, true), (false, false), false),
816 ((true, true), (false, true), false),
817 ((true, true), (true, false), false),
818 ((true, false), (false, true), false),
819 ((true, false), (false, false), false),
820 ((true, false), (true, true), false),
821 ((true, false), (true, false), true),
822 ];
823 for (
824 (physical_desc, physical_nulls_first),
825 (req_desc, req_nulls_first),
826 expected,
827 ) in params
828 {
829 let physical_ordering = PhysicalSortExpr {
830 expr: col("nullable_col", &schema)?,
831 options: SortOptions {
832 descending: physical_desc,
833 nulls_first: physical_nulls_first,
834 },
835 };
836 let required_ordering = PhysicalSortExpr {
837 expr: col("nullable_col", &schema)?,
838 options: SortOptions {
839 descending: req_desc,
840 nulls_first: req_nulls_first,
841 },
842 };
843 let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
844 assert_eq!(res, expected);
845 }
846
847 Ok(())
848 }
849
850 #[tokio::test]
851 async fn test_satisfy_non_nullable() -> Result<()> {
852 let schema = create_test_schema()?;
853
854 let params = vec![
855 ((true, true), (false, false), false),
856 ((true, true), (false, true), false),
857 ((true, true), (true, false), true),
858 ((true, false), (false, true), false),
859 ((true, false), (false, false), false),
860 ((true, false), (true, true), true),
861 ((true, false), (true, false), true),
862 ];
863 for (
864 (physical_desc, physical_nulls_first),
865 (req_desc, req_nulls_first),
866 expected,
867 ) in params
868 {
869 let physical_ordering = PhysicalSortExpr {
870 expr: col("non_nullable_col", &schema)?,
871 options: SortOptions {
872 descending: physical_desc,
873 nulls_first: physical_nulls_first,
874 },
875 };
876 let required_ordering = PhysicalSortExpr {
877 expr: col("non_nullable_col", &schema)?,
878 options: SortOptions {
879 descending: req_desc,
880 nulls_first: req_nulls_first,
881 },
882 };
883 let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
884 assert_eq!(res, expected);
885 }
886
887 Ok(())
888 }
889
890 #[tokio::test]
891 async fn test_get_window_mode_exhaustive() -> Result<()> {
892 let test_schema = create_test_schema3()?;
893 let sort_exprs = vec![
897 sort_expr("a", &test_schema),
898 sort_expr("b", &test_schema),
899 sort_expr("c", &test_schema),
900 sort_expr("d", &test_schema),
901 ];
902 let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, true)?;
903
904 let test_cases = vec![
916 (vec!["a"], vec!["a"], Some(Sorted)),
917 (vec!["a"], vec!["b"], Some(Sorted)),
918 (vec!["a"], vec!["c"], None),
919 (vec!["a"], vec!["a", "b"], Some(Sorted)),
920 (vec!["a"], vec!["b", "c"], Some(Sorted)),
921 (vec!["a"], vec!["a", "c"], None),
922 (vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
923 (vec!["b"], vec!["a"], Some(Linear)),
924 (vec!["b"], vec!["b"], Some(Linear)),
925 (vec!["b"], vec!["c"], None),
926 (vec!["b"], vec!["a", "b"], Some(Linear)),
927 (vec!["b"], vec!["b", "c"], None),
928 (vec!["b"], vec!["a", "c"], Some(Linear)),
929 (vec!["b"], vec!["a", "b", "c"], Some(Linear)),
930 (vec!["c"], vec!["a"], Some(Linear)),
931 (vec!["c"], vec!["b"], None),
932 (vec!["c"], vec!["c"], Some(Linear)),
933 (vec!["c"], vec!["a", "b"], Some(Linear)),
934 (vec!["c"], vec!["b", "c"], None),
935 (vec!["c"], vec!["a", "c"], Some(Linear)),
936 (vec!["c"], vec!["a", "b", "c"], Some(Linear)),
937 (vec!["b", "a"], vec!["a"], Some(Sorted)),
938 (vec!["b", "a"], vec!["b"], Some(Sorted)),
939 (vec!["b", "a"], vec!["c"], Some(Sorted)),
940 (vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
941 (vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
942 (vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
943 (vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
944 (vec!["c", "b"], vec!["a"], Some(Linear)),
945 (vec!["c", "b"], vec!["b"], Some(Linear)),
946 (vec!["c", "b"], vec!["c"], Some(Linear)),
947 (vec!["c", "b"], vec!["a", "b"], Some(Linear)),
948 (vec!["c", "b"], vec!["b", "c"], Some(Linear)),
949 (vec!["c", "b"], vec!["a", "c"], Some(Linear)),
950 (vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
951 (vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
952 (vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
953 (vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
954 (
955 vec!["c", "a"],
956 vec!["a", "b"],
957 Some(PartiallySorted(vec![1])),
958 ),
959 (
960 vec!["c", "a"],
961 vec!["b", "c"],
962 Some(PartiallySorted(vec![1])),
963 ),
964 (
965 vec!["c", "a"],
966 vec!["a", "c"],
967 Some(PartiallySorted(vec![1])),
968 ),
969 (
970 vec!["c", "a"],
971 vec!["a", "b", "c"],
972 Some(PartiallySorted(vec![1])),
973 ),
974 (vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
975 (vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
976 (vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
977 (vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
978 (vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
979 (vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
980 (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
981 ];
982 for (case_idx, test_case) in test_cases.iter().enumerate() {
983 let (partition_by_columns, order_by_params, expected) = &test_case;
984 let mut partition_by_exprs = vec![];
985 for col_name in partition_by_columns {
986 partition_by_exprs.push(col(col_name, &test_schema)?);
987 }
988
989 let mut order_by_exprs = LexOrdering::default();
990 for col_name in order_by_params {
991 let expr = col(col_name, &test_schema)?;
992 let options = SortOptions::default();
995 order_by_exprs.push(PhysicalSortExpr { expr, options });
996 }
997 let res = get_window_mode(
998 &partition_by_exprs,
999 order_by_exprs.as_ref(),
1000 &exec_unbounded,
1001 );
1002 let res = res.map(|(_, mode)| mode);
1004 assert_eq!(
1005 res, *expected,
1006 "Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
1007 );
1008 }
1009
1010 Ok(())
1011 }
1012
1013 #[tokio::test]
1014 async fn test_get_window_mode() -> Result<()> {
1015 let test_schema = create_test_schema3()?;
1016 let sort_exprs = vec![
1020 sort_expr("a", &test_schema),
1021 sort_expr("b", &test_schema),
1022 sort_expr("c", &test_schema),
1023 sort_expr("d", &test_schema),
1024 ];
1025 let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, true)?;
1026
1027 let test_cases = vec![
1041 (vec!["a", "b"], vec![("c", false, false)], None),
1043 (vec![], vec![("c", false, true)], None),
1045 (vec!["b"], vec![("c", false, true)], None),
1047 (vec!["a"], vec![("c", false, true)], None),
1049 (
1051 vec!["a", "b"],
1052 vec![("c", false, true), ("e", false, true)],
1053 None,
1054 ),
1055 (vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
1057 (vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
1059 (vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
1061 (vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
1063 (vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
1065 (vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
1067 (vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
1069 (
1071 vec!["a", "b"],
1072 vec![("c", false, true)],
1073 Some((false, Sorted)),
1074 ),
1075 (
1077 vec!["b", "a"],
1078 vec![("c", false, true)],
1079 Some((false, Sorted)),
1080 ),
1081 (
1083 vec!["a", "b"],
1084 vec![("c", true, false)],
1085 Some((true, Sorted)),
1086 ),
1087 (
1089 vec!["e"],
1090 vec![("a", false, true)],
1091 Some((false, Linear)),
1093 ),
1094 (
1096 vec!["b", "c"],
1097 vec![("a", false, true), ("c", false, true)],
1098 Some((false, Linear)),
1099 ),
1100 (vec!["b"], vec![("a", false, true)], Some((false, Linear))),
1102 (
1104 vec!["a", "e"],
1105 vec![("b", false, true)],
1106 Some((false, PartiallySorted(vec![0]))),
1107 ),
1108 (
1110 vec!["a", "c"],
1111 vec![("b", false, true)],
1112 Some((false, PartiallySorted(vec![0]))),
1113 ),
1114 (
1116 vec!["c", "a"],
1117 vec![("b", false, true)],
1118 Some((false, PartiallySorted(vec![1]))),
1119 ),
1120 (
1122 vec!["d", "b", "a"],
1123 vec![("c", false, true)],
1124 Some((false, PartiallySorted(vec![2, 1]))),
1125 ),
1126 (
1128 vec!["e", "b", "a"],
1129 vec![("c", false, true)],
1130 Some((false, PartiallySorted(vec![2, 1]))),
1131 ),
1132 (
1134 vec!["d", "a"],
1135 vec![("b", false, true)],
1136 Some((false, PartiallySorted(vec![1]))),
1137 ),
1138 (
1140 vec!["a"],
1141 vec![("b", false, true), ("a", false, true)],
1142 Some((false, Sorted)),
1143 ),
1144 (vec![], vec![("b", false, true), ("a", false, true)], None),
1146 ];
1147 for (case_idx, test_case) in test_cases.iter().enumerate() {
1148 let (partition_by_columns, order_by_params, expected) = &test_case;
1149 let mut partition_by_exprs = vec![];
1150 for col_name in partition_by_columns {
1151 partition_by_exprs.push(col(col_name, &test_schema)?);
1152 }
1153
1154 let mut order_by_exprs = LexOrdering::default();
1155 for (col_name, descending, nulls_first) in order_by_params {
1156 let expr = col(col_name, &test_schema)?;
1157 let options = SortOptions {
1158 descending: *descending,
1159 nulls_first: *nulls_first,
1160 };
1161 order_by_exprs.push(PhysicalSortExpr { expr, options });
1162 }
1163
1164 assert_eq!(
1165 get_window_mode(&partition_by_exprs, order_by_exprs.as_ref(), &exec_unbounded),
1166 *expected,
1167 "Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
1168 );
1169 }
1170
1171 Ok(())
1172 }
1173}