1use std::any::Any;
24use std::cmp::{min, Ordering};
25use std::collections::VecDeque;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use super::utils::create_schema;
31use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
32use crate::windows::{
33 calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
34 window_equivalence_properties,
35};
36use crate::{
37 ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
38 ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream,
39 SendableRecordBatchStream, Statistics, WindowExpr,
40};
41use ahash::RandomState;
42use arrow::compute::take_record_batch;
43use arrow::{
44 array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
45 compute::{concat, concat_batches, sort_to_indices, take_arrays},
46 datatypes::SchemaRef,
47 record_batch::RecordBatch,
48};
49use datafusion_common::hash_utils::create_hashes;
50use datafusion_common::stats::Precision;
51use datafusion_common::utils::{
52 evaluate_partition_ranges, get_at_indices, get_row_at_idx,
53};
54use datafusion_common::{
55 arrow_datafusion_err, exec_err, DataFusionError, HashMap, Result,
56};
57use datafusion_execution::TaskContext;
58use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
59use datafusion_expr::ColumnarValue;
60use datafusion_physical_expr::window::{
61 PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowState,
62};
63use datafusion_physical_expr::PhysicalExpr;
64use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
65
66use futures::stream::Stream;
67use futures::{ready, StreamExt};
68use hashbrown::hash_table::HashTable;
69use indexmap::IndexMap;
70use log::debug;
71
72#[derive(Debug, Clone)]
74pub struct BoundedWindowAggExec {
75 input: Arc<dyn ExecutionPlan>,
77 window_expr: Vec<Arc<dyn WindowExpr>>,
79 schema: SchemaRef,
81 metrics: ExecutionPlanMetricsSet,
83 pub input_order_mode: InputOrderMode,
85 ordered_partition_by_indices: Vec<usize>,
92 cache: PlanProperties,
94 can_repartition: bool,
96}
97
98impl BoundedWindowAggExec {
99 pub fn try_new(
101 window_expr: Vec<Arc<dyn WindowExpr>>,
102 input: Arc<dyn ExecutionPlan>,
103 input_order_mode: InputOrderMode,
104 can_repartition: bool,
105 ) -> Result<Self> {
106 let schema = create_schema(&input.schema(), &window_expr)?;
107 let schema = Arc::new(schema);
108 let partition_by_exprs = window_expr[0].partition_by();
109 let ordered_partition_by_indices = match &input_order_mode {
110 InputOrderMode::Sorted => {
111 let indices = get_ordered_partition_by_indices(
112 window_expr[0].partition_by(),
113 &input,
114 );
115 if indices.len() == partition_by_exprs.len() {
116 indices
117 } else {
118 (0..partition_by_exprs.len()).collect::<Vec<_>>()
119 }
120 }
121 InputOrderMode::PartiallySorted(ordered_indices) => ordered_indices.clone(),
122 InputOrderMode::Linear => {
123 vec![]
124 }
125 };
126 let cache = Self::compute_properties(&input, &schema, &window_expr);
127 Ok(Self {
128 input,
129 window_expr,
130 schema,
131 metrics: ExecutionPlanMetricsSet::new(),
132 input_order_mode,
133 ordered_partition_by_indices,
134 cache,
135 can_repartition,
136 })
137 }
138
139 pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
141 &self.window_expr
142 }
143
144 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
146 &self.input
147 }
148
149 pub fn partition_by_sort_keys(&self) -> Result<LexOrdering> {
155 let partition_by = self.window_expr()[0].partition_by();
156 get_partition_by_sort_exprs(
157 &self.input,
158 partition_by,
159 &self.ordered_partition_by_indices,
160 )
161 }
162
163 fn get_search_algo(&self) -> Result<Box<dyn PartitionSearcher>> {
166 let partition_by_sort_keys = self.partition_by_sort_keys()?;
167 let ordered_partition_by_indices = self.ordered_partition_by_indices.clone();
168 let input_schema = self.input().schema();
169 Ok(match &self.input_order_mode {
170 InputOrderMode::Sorted => {
171 if self.window_expr()[0].partition_by().len()
173 != ordered_partition_by_indices.len()
174 {
175 return exec_err!("All partition by columns should have an ordering in Sorted mode.");
176 }
177 Box::new(SortedSearch {
178 partition_by_sort_keys,
179 ordered_partition_by_indices,
180 input_schema,
181 })
182 }
183 InputOrderMode::Linear | InputOrderMode::PartiallySorted(_) => Box::new(
184 LinearSearch::new(ordered_partition_by_indices, input_schema),
185 ),
186 })
187 }
188
189 fn compute_properties(
191 input: &Arc<dyn ExecutionPlan>,
192 schema: &SchemaRef,
193 window_exprs: &[Arc<dyn WindowExpr>],
194 ) -> PlanProperties {
195 let eq_properties = window_equivalence_properties(schema, input, window_exprs);
197
198 let output_partitioning = input.output_partitioning().clone();
202
203 PlanProperties::new(
205 eq_properties,
206 output_partitioning,
207 input.pipeline_behavior(),
209 input.boundedness(),
210 )
211 }
212
213 pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
214 if !self.can_repartition {
215 vec![]
216 } else {
217 let all_partition_keys = self
218 .window_expr()
219 .iter()
220 .map(|expr| expr.partition_by().to_vec())
221 .collect::<Vec<_>>();
222
223 all_partition_keys
224 .into_iter()
225 .min_by_key(|s| s.len())
226 .unwrap_or_else(Vec::new)
227 }
228 }
229}
230
231impl DisplayAs for BoundedWindowAggExec {
232 fn fmt_as(
233 &self,
234 t: DisplayFormatType,
235 f: &mut std::fmt::Formatter,
236 ) -> std::fmt::Result {
237 match t {
238 DisplayFormatType::Default | DisplayFormatType::Verbose => {
239 write!(f, "BoundedWindowAggExec: ")?;
240 let g: Vec<String> = self
241 .window_expr
242 .iter()
243 .map(|e| {
244 format!(
245 "{}: {:?}, frame: {:?}",
246 e.name().to_owned(),
247 e.field(),
248 e.get_window_frame()
249 )
250 })
251 .collect();
252 let mode = &self.input_order_mode;
253 write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?;
254 }
255 }
256 Ok(())
257 }
258}
259
260impl ExecutionPlan for BoundedWindowAggExec {
261 fn name(&self) -> &'static str {
262 "BoundedWindowAggExec"
263 }
264
265 fn as_any(&self) -> &dyn Any {
267 self
268 }
269
270 fn properties(&self) -> &PlanProperties {
271 &self.cache
272 }
273
274 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
275 vec![&self.input]
276 }
277
278 fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
279 let partition_bys = self.window_expr()[0].partition_by();
280 let order_keys = self.window_expr()[0].order_by();
281 let partition_bys = self
282 .ordered_partition_by_indices
283 .iter()
284 .map(|idx| &partition_bys[*idx]);
285 vec![calc_requirements(partition_bys, order_keys.iter())]
286 }
287
288 fn required_input_distribution(&self) -> Vec<Distribution> {
289 if self.partition_keys().is_empty() {
290 debug!("No partition defined for BoundedWindowAggExec!!!");
291 vec![Distribution::SinglePartition]
292 } else {
293 vec![Distribution::HashPartitioned(self.partition_keys().clone())]
294 }
295 }
296
297 fn maintains_input_order(&self) -> Vec<bool> {
298 vec![true]
299 }
300
301 fn with_new_children(
302 self: Arc<Self>,
303 children: Vec<Arc<dyn ExecutionPlan>>,
304 ) -> Result<Arc<dyn ExecutionPlan>> {
305 Ok(Arc::new(BoundedWindowAggExec::try_new(
306 self.window_expr.clone(),
307 Arc::clone(&children[0]),
308 self.input_order_mode.clone(),
309 self.can_repartition,
310 )?))
311 }
312
313 fn execute(
314 &self,
315 partition: usize,
316 context: Arc<TaskContext>,
317 ) -> Result<SendableRecordBatchStream> {
318 let input = self.input.execute(partition, context)?;
319 let search_mode = self.get_search_algo()?;
320 let stream = Box::pin(BoundedWindowAggStream::new(
321 Arc::clone(&self.schema),
322 self.window_expr.clone(),
323 input,
324 BaselineMetrics::new(&self.metrics, partition),
325 search_mode,
326 )?);
327 Ok(stream)
328 }
329
330 fn metrics(&self) -> Option<MetricsSet> {
331 Some(self.metrics.clone_inner())
332 }
333
334 fn statistics(&self) -> Result<Statistics> {
335 let input_stat = self.input.statistics()?;
336 let win_cols = self.window_expr.len();
337 let input_cols = self.input.schema().fields().len();
338 let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
340 column_statistics.extend(input_stat.column_statistics);
342 for _ in 0..win_cols {
343 column_statistics.push(ColumnStatistics::new_unknown())
344 }
345 Ok(Statistics {
346 num_rows: input_stat.num_rows,
347 column_statistics,
348 total_byte_size: Precision::Absent,
349 })
350 }
351}
352
353trait PartitionSearcher: Send {
356 fn calculate_out_columns(
366 &mut self,
367 input_buffer: &RecordBatch,
368 window_agg_states: &[PartitionWindowAggStates],
369 partition_buffers: &mut PartitionBatches,
370 window_expr: &[Arc<dyn WindowExpr>],
371 ) -> Result<Option<Vec<ArrayRef>>>;
372
373 fn is_mode_linear(&self) -> bool {
375 false
376 }
377
378 fn evaluate_partition_batches(
380 &mut self,
381 record_batch: &RecordBatch,
382 window_expr: &[Arc<dyn WindowExpr>],
383 ) -> Result<Vec<(PartitionKey, RecordBatch)>>;
384
385 fn prune(&mut self, _n_out: usize) {}
387
388 fn mark_partition_end(&self, partition_buffers: &mut PartitionBatches);
391
392 fn update_partition_batch(
394 &mut self,
395 input_buffer: &mut RecordBatch,
396 record_batch: RecordBatch,
397 window_expr: &[Arc<dyn WindowExpr>],
398 partition_buffers: &mut PartitionBatches,
399 ) -> Result<()> {
400 if record_batch.num_rows() == 0 {
401 return Ok(());
402 }
403 let partition_batches =
404 self.evaluate_partition_batches(&record_batch, window_expr)?;
405 for (partition_row, partition_batch) in partition_batches {
406 let partition_batch_state = partition_buffers
407 .entry(partition_row)
408 .or_insert_with(|| {
413 PartitionBatchState::new(Arc::clone(self.input_schema()))
414 });
415 partition_batch_state.extend(&partition_batch)?;
416 }
417
418 if self.is_mode_linear() {
419 let last_row = get_last_row_batch(&record_batch)?;
433 for (_, partition_batch) in partition_buffers.iter_mut() {
434 partition_batch.set_most_recent_row(last_row.clone());
435 }
436 }
437 self.mark_partition_end(partition_buffers);
438
439 *input_buffer = if input_buffer.num_rows() == 0 {
440 record_batch
441 } else {
442 concat_batches(self.input_schema(), [input_buffer, &record_batch])?
443 };
444
445 Ok(())
446 }
447
448 fn input_schema(&self) -> &SchemaRef;
449}
450
451pub struct LinearSearch {
454 input_buffer_hashes: VecDeque<u64>,
457 random_state: RandomState,
459 ordered_partition_by_indices: Vec<usize>,
464 row_map_batch: HashTable<(u64, usize)>,
468 row_map_out: HashTable<(u64, usize, usize)>,
474 input_schema: SchemaRef,
475}
476
477impl PartitionSearcher for LinearSearch {
478 fn calculate_out_columns(
513 &mut self,
514 input_buffer: &RecordBatch,
515 window_agg_states: &[PartitionWindowAggStates],
516 partition_buffers: &mut PartitionBatches,
517 window_expr: &[Arc<dyn WindowExpr>],
518 ) -> Result<Option<Vec<ArrayRef>>> {
519 let partition_output_indices = self.calc_partition_output_indices(
520 input_buffer,
521 window_agg_states,
522 window_expr,
523 )?;
524
525 let n_window_col = window_agg_states.len();
526 let mut new_columns = vec![vec![]; n_window_col];
527 let mut all_indices = UInt32Builder::with_capacity(input_buffer.num_rows());
529 for (row, indices) in partition_output_indices {
530 let length = indices.len();
531 for (idx, window_agg_state) in window_agg_states.iter().enumerate() {
532 let partition = &window_agg_state[&row];
533 let values = Arc::clone(&partition.state.out_col.slice(0, length));
534 new_columns[idx].push(values);
535 }
536 let partition_batch_state = &mut partition_buffers[&row];
537 partition_batch_state.n_out_row = length;
539 all_indices.append_slice(&indices);
541 }
542 let all_indices = all_indices.finish();
543 if all_indices.is_empty() {
544 return Ok(None);
546 }
547
548 let new_columns = new_columns
551 .iter()
552 .map(|items| {
553 concat(&items.iter().map(|e| e.as_ref()).collect::<Vec<_>>())
554 .map_err(|e| arrow_datafusion_err!(e))
555 })
556 .collect::<Result<Vec<_>>>()?;
557 let sorted_indices = sort_to_indices(&all_indices, None, None)?;
559 take_arrays(&new_columns, &sorted_indices, None)
561 .map(Some)
562 .map_err(|e| arrow_datafusion_err!(e))
563 }
564
565 fn evaluate_partition_batches(
566 &mut self,
567 record_batch: &RecordBatch,
568 window_expr: &[Arc<dyn WindowExpr>],
569 ) -> Result<Vec<(PartitionKey, RecordBatch)>> {
570 let partition_bys =
571 evaluate_partition_by_column_values(record_batch, window_expr)?;
572 self.get_per_partition_indices(&partition_bys, record_batch)?
577 .into_iter()
578 .map(|(row, indices)| {
579 let mut new_indices = UInt32Builder::with_capacity(indices.len());
580 new_indices.append_slice(&indices);
581 let indices = new_indices.finish();
582 Ok((row, take_record_batch(record_batch, &indices)?))
583 })
584 .collect()
585 }
586
587 fn prune(&mut self, n_out: usize) {
588 self.input_buffer_hashes.drain(0..n_out);
590 }
591
592 fn mark_partition_end(&self, partition_buffers: &mut PartitionBatches) {
593 if !self.ordered_partition_by_indices.is_empty() {
596 if let Some((last_row, _)) = partition_buffers.last() {
597 let last_sorted_cols = self
598 .ordered_partition_by_indices
599 .iter()
600 .map(|idx| last_row[*idx].clone())
601 .collect::<Vec<_>>();
602 for (row, partition_batch_state) in partition_buffers.iter_mut() {
603 let sorted_cols = self
604 .ordered_partition_by_indices
605 .iter()
606 .map(|idx| &row[*idx]);
607 partition_batch_state.is_end = !sorted_cols.eq(&last_sorted_cols);
611 }
612 }
613 }
614 }
615
616 fn is_mode_linear(&self) -> bool {
617 self.ordered_partition_by_indices.is_empty()
618 }
619
620 fn input_schema(&self) -> &SchemaRef {
621 &self.input_schema
622 }
623}
624
625impl LinearSearch {
626 fn new(ordered_partition_by_indices: Vec<usize>, input_schema: SchemaRef) -> Self {
628 LinearSearch {
629 input_buffer_hashes: VecDeque::new(),
630 random_state: Default::default(),
631 ordered_partition_by_indices,
632 row_map_batch: HashTable::with_capacity(256),
633 row_map_out: HashTable::with_capacity(256),
634 input_schema,
635 }
636 }
637
638 fn get_per_partition_indices(
641 &mut self,
642 columns: &[ArrayRef],
643 batch: &RecordBatch,
644 ) -> Result<Vec<(PartitionKey, Vec<u32>)>> {
645 let mut batch_hashes = vec![0; batch.num_rows()];
646 create_hashes(columns, &self.random_state, &mut batch_hashes)?;
647 self.input_buffer_hashes.extend(&batch_hashes);
648 self.row_map_batch.clear();
650 let mut result: Vec<(PartitionKey, Vec<u32>)> = vec![];
652 for (hash, row_idx) in batch_hashes.into_iter().zip(0u32..) {
653 let entry = self.row_map_batch.find_mut(hash, |(_, group_idx)| {
654 let row = get_row_at_idx(columns, row_idx as usize).unwrap();
657 row.eq(&result[*group_idx].0)
659 });
660 if let Some((_, group_idx)) = entry {
661 result[*group_idx].1.push(row_idx)
662 } else {
663 self.row_map_batch.insert_unique(
664 hash,
665 (hash, result.len()),
666 |(hash, _)| *hash,
667 );
668 let row = get_row_at_idx(columns, row_idx as usize)?;
669 result.push((row, vec![row_idx]));
671 }
672 }
673 Ok(result)
674 }
675
676 fn calc_partition_output_indices(
681 &mut self,
682 input_buffer: &RecordBatch,
683 window_agg_states: &[PartitionWindowAggStates],
684 window_expr: &[Arc<dyn WindowExpr>],
685 ) -> Result<Vec<(PartitionKey, Vec<u32>)>> {
686 let partition_by_columns =
687 evaluate_partition_by_column_values(input_buffer, window_expr)?;
688 self.row_map_out.clear();
690 let mut partition_indices: Vec<(PartitionKey, Vec<u32>)> = vec![];
691 for (hash, row_idx) in self.input_buffer_hashes.iter().zip(0u32..) {
692 let entry = self.row_map_out.find_mut(*hash, |(_, group_idx, _)| {
693 let row =
694 get_row_at_idx(&partition_by_columns, row_idx as usize).unwrap();
695 row == partition_indices[*group_idx].0
696 });
697 if let Some((_, group_idx, n_out)) = entry {
698 let (_, indices) = &mut partition_indices[*group_idx];
699 if indices.len() >= *n_out {
700 break;
701 }
702 indices.push(row_idx);
703 } else {
704 let row = get_row_at_idx(&partition_by_columns, row_idx as usize)?;
705 let min_out = window_agg_states
706 .iter()
707 .map(|window_agg_state| {
708 window_agg_state
709 .get(&row)
710 .map(|partition| partition.state.out_col.len())
711 .unwrap_or(0)
712 })
713 .min()
714 .unwrap_or(0);
715 if min_out == 0 {
716 break;
717 }
718 self.row_map_out.insert_unique(
719 *hash,
720 (*hash, partition_indices.len(), min_out),
721 |(hash, _, _)| *hash,
722 );
723 partition_indices.push((row, vec![row_idx]));
724 }
725 }
726 Ok(partition_indices)
727 }
728}
729
730pub struct SortedSearch {
733 partition_by_sort_keys: LexOrdering,
735 ordered_partition_by_indices: Vec<usize>,
740 input_schema: SchemaRef,
741}
742
743impl PartitionSearcher for SortedSearch {
744 fn calculate_out_columns(
746 &mut self,
747 _input_buffer: &RecordBatch,
748 window_agg_states: &[PartitionWindowAggStates],
749 partition_buffers: &mut PartitionBatches,
750 _window_expr: &[Arc<dyn WindowExpr>],
751 ) -> Result<Option<Vec<ArrayRef>>> {
752 let n_out = self.calculate_n_out_row(window_agg_states, partition_buffers);
753 if n_out == 0 {
754 Ok(None)
755 } else {
756 window_agg_states
757 .iter()
758 .map(|map| get_aggregate_result_out_column(map, n_out).map(Some))
759 .collect()
760 }
761 }
762
763 fn evaluate_partition_batches(
764 &mut self,
765 record_batch: &RecordBatch,
766 _window_expr: &[Arc<dyn WindowExpr>],
767 ) -> Result<Vec<(PartitionKey, RecordBatch)>> {
768 let num_rows = record_batch.num_rows();
769 let partition_columns = self
771 .partition_by_sort_keys
772 .iter()
773 .map(|elem| elem.evaluate_to_sort_column(record_batch))
774 .collect::<Result<Vec<_>>>()?;
775 let partition_columns_ordered =
777 get_at_indices(&partition_columns, &self.ordered_partition_by_indices)?;
778 let partition_points =
779 evaluate_partition_ranges(num_rows, &partition_columns_ordered)?;
780 let partition_bys = partition_columns
781 .into_iter()
782 .map(|arr| arr.values)
783 .collect::<Vec<ArrayRef>>();
784
785 partition_points
786 .iter()
787 .map(|range| {
788 let row = get_row_at_idx(&partition_bys, range.start)?;
789 let len = range.end - range.start;
790 let slice = record_batch.slice(range.start, len);
791 Ok((row, slice))
792 })
793 .collect::<Result<Vec<_>>>()
794 }
795
796 fn mark_partition_end(&self, partition_buffers: &mut PartitionBatches) {
797 let n_partitions = partition_buffers.len();
801 for (idx, (_, partition_batch_state)) in partition_buffers.iter_mut().enumerate()
802 {
803 partition_batch_state.is_end |= idx < n_partitions - 1;
804 }
805 }
806
807 fn input_schema(&self) -> &SchemaRef {
808 &self.input_schema
809 }
810}
811
812impl SortedSearch {
813 fn calculate_n_out_row(
815 &mut self,
816 window_agg_states: &[PartitionWindowAggStates],
817 partition_buffers: &mut PartitionBatches,
818 ) -> usize {
819 let mut counts = vec![];
822 let out_col_counts = window_agg_states.iter().map(|window_agg_state| {
823 let mut cur_window_expr_out_result_len = 0;
826 let mut per_partition_out_results = HashMap::new();
830 for (row, WindowState { state, .. }) in window_agg_state.iter() {
831 cur_window_expr_out_result_len += state.out_col.len();
832 let count = per_partition_out_results.entry(row).or_insert(0);
833 if *count < state.out_col.len() {
834 *count = state.out_col.len();
835 }
836 if state.n_row_result_missing > 0 {
840 break;
841 }
842 }
843 counts.push(per_partition_out_results);
844 cur_window_expr_out_result_len
845 });
846 argmin(out_col_counts).map_or(0, |(min_idx, minima)| {
847 for (row, count) in counts.swap_remove(min_idx).into_iter() {
848 let partition_batch = &mut partition_buffers[row];
849 partition_batch.n_out_row = count;
850 }
851 minima
852 })
853 }
854}
855
856fn evaluate_partition_by_column_values(
859 record_batch: &RecordBatch,
860 window_expr: &[Arc<dyn WindowExpr>],
861) -> Result<Vec<ArrayRef>> {
862 window_expr[0]
863 .partition_by()
864 .iter()
865 .map(|item| match item.evaluate(record_batch)? {
866 ColumnarValue::Array(array) => Ok(array),
867 ColumnarValue::Scalar(scalar) => {
868 scalar.to_array_of_size(record_batch.num_rows())
869 }
870 })
871 .collect()
872}
873
874pub struct BoundedWindowAggStream {
876 schema: SchemaRef,
877 input: SendableRecordBatchStream,
878 input_buffer: RecordBatch,
881 partition_buffers: PartitionBatches,
890 window_agg_states: Vec<PartitionWindowAggStates>,
894 finished: bool,
895 window_expr: Vec<Arc<dyn WindowExpr>>,
896 baseline_metrics: BaselineMetrics,
897 search_mode: Box<dyn PartitionSearcher>,
900}
901
902impl BoundedWindowAggStream {
903 fn prune_state(&mut self, n_out: usize) -> Result<()> {
911 self.prune_out_columns();
913 self.prune_partition_batches();
915 self.prune_input_batch(n_out)?;
917 self.search_mode.prune(n_out);
919 Ok(())
920 }
921}
922
923impl Stream for BoundedWindowAggStream {
924 type Item = Result<RecordBatch>;
925
926 fn poll_next(
927 mut self: Pin<&mut Self>,
928 cx: &mut Context<'_>,
929 ) -> Poll<Option<Self::Item>> {
930 let poll = self.poll_next_inner(cx);
931 self.baseline_metrics.record_poll(poll)
932 }
933}
934
935impl BoundedWindowAggStream {
936 fn new(
938 schema: SchemaRef,
939 window_expr: Vec<Arc<dyn WindowExpr>>,
940 input: SendableRecordBatchStream,
941 baseline_metrics: BaselineMetrics,
942 search_mode: Box<dyn PartitionSearcher>,
943 ) -> Result<Self> {
944 let state = window_expr.iter().map(|_| IndexMap::new()).collect();
945 let empty_batch = RecordBatch::new_empty(Arc::clone(&schema));
946 Ok(Self {
947 schema,
948 input,
949 input_buffer: empty_batch,
950 partition_buffers: IndexMap::new(),
951 window_agg_states: state,
952 finished: false,
953 window_expr,
954 baseline_metrics,
955 search_mode,
956 })
957 }
958
959 fn compute_aggregates(&mut self) -> Result<Option<RecordBatch>> {
960 for (cur_window_expr, state) in
962 self.window_expr.iter().zip(&mut self.window_agg_states)
963 {
964 cur_window_expr.evaluate_stateful(&self.partition_buffers, state)?;
965 }
966
967 let schema = Arc::clone(&self.schema);
968 let window_expr_out = self.search_mode.calculate_out_columns(
969 &self.input_buffer,
970 &self.window_agg_states,
971 &mut self.partition_buffers,
972 &self.window_expr,
973 )?;
974 if let Some(window_expr_out) = window_expr_out {
975 let n_out = window_expr_out[0].len();
976 let columns_to_show = self
978 .input_buffer
979 .columns()
980 .iter()
981 .map(|elem| elem.slice(0, n_out))
982 .chain(window_expr_out)
983 .collect::<Vec<_>>();
984 let n_generated = columns_to_show[0].len();
985 self.prune_state(n_generated)?;
986 Ok(Some(RecordBatch::try_new(schema, columns_to_show)?))
987 } else {
988 Ok(None)
989 }
990 }
991
992 #[inline]
993 fn poll_next_inner(
994 &mut self,
995 cx: &mut Context<'_>,
996 ) -> Poll<Option<Result<RecordBatch>>> {
997 if self.finished {
998 return Poll::Ready(None);
999 }
1000
1001 let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
1002 match ready!(self.input.poll_next_unpin(cx)) {
1003 Some(Ok(batch)) => {
1004 let _timer = elapsed_compute.timer();
1007
1008 self.search_mode.update_partition_batch(
1009 &mut self.input_buffer,
1010 batch,
1011 &self.window_expr,
1012 &mut self.partition_buffers,
1013 )?;
1014 if let Some(batch) = self.compute_aggregates()? {
1015 return Poll::Ready(Some(Ok(batch)));
1016 }
1017 self.poll_next_inner(cx)
1018 }
1019 Some(Err(e)) => Poll::Ready(Some(Err(e))),
1020 None => {
1021 let _timer = elapsed_compute.timer();
1022
1023 self.finished = true;
1024 for (_, partition_batch_state) in self.partition_buffers.iter_mut() {
1025 partition_batch_state.is_end = true;
1026 }
1027 if let Some(batch) = self.compute_aggregates()? {
1028 return Poll::Ready(Some(Ok(batch)));
1029 }
1030 Poll::Ready(None)
1031 }
1032 }
1033 }
1034
1035 fn prune_partition_batches(&mut self) {
1038 self.partition_buffers
1042 .retain(|_, partition_batch_state| !partition_batch_state.is_end);
1043
1044 let mut n_prune_each_partition = HashMap::new();
1056 for window_agg_state in self.window_agg_states.iter_mut() {
1057 window_agg_state.retain(|_, WindowState { state, .. }| !state.is_end);
1058 for (partition_row, WindowState { state: value, .. }) in window_agg_state {
1059 let n_prune =
1060 min(value.window_frame_range.start, value.last_calculated_index);
1061 if let Some(current) = n_prune_each_partition.get_mut(partition_row) {
1062 if n_prune < *current {
1063 *current = n_prune;
1064 }
1065 } else {
1066 n_prune_each_partition.insert(partition_row.clone(), n_prune);
1067 }
1068 }
1069 }
1070
1071 for (partition_row, n_prune) in n_prune_each_partition.iter() {
1073 let pb_state = &mut self.partition_buffers[partition_row];
1074
1075 let batch = &pb_state.record_batch;
1076 pb_state.record_batch = batch.slice(*n_prune, batch.num_rows() - n_prune);
1077 pb_state.n_out_row = 0;
1078
1079 for window_agg_state in self.window_agg_states.iter_mut() {
1081 window_agg_state[partition_row].state.prune_state(*n_prune);
1082 }
1083 }
1084 }
1085
1086 fn prune_input_batch(&mut self, n_out: usize) -> Result<()> {
1089 let n_to_keep = self.input_buffer.num_rows() - n_out;
1091 let batch_to_keep = self
1092 .input_buffer
1093 .columns()
1094 .iter()
1095 .map(|elem| elem.slice(n_out, n_to_keep))
1096 .collect::<Vec<_>>();
1097 self.input_buffer = RecordBatch::try_new_with_options(
1098 self.input_buffer.schema(),
1099 batch_to_keep,
1100 &RecordBatchOptions::new().with_row_count(Some(n_to_keep)),
1101 )?;
1102 Ok(())
1103 }
1104
1105 fn prune_out_columns(&mut self) {
1107 for partition_window_agg_states in self.window_agg_states.iter_mut() {
1111 partition_window_agg_states
1115 .retain(|_, partition_batch_state| !partition_batch_state.state.is_end);
1116 for (
1117 partition_key,
1118 WindowState {
1119 state: WindowAggState { out_col, .. },
1120 ..
1121 },
1122 ) in partition_window_agg_states
1123 {
1124 let partition_batch = &mut self.partition_buffers[partition_key];
1125 let n_to_del = partition_batch.n_out_row;
1126 let n_to_keep = out_col.len() - n_to_del;
1127 *out_col = out_col.slice(n_to_del, n_to_keep);
1128 }
1129 }
1130 }
1131}
1132
1133impl RecordBatchStream for BoundedWindowAggStream {
1134 fn schema(&self) -> SchemaRef {
1136 Arc::clone(&self.schema)
1137 }
1138}
1139
1140fn argmin<T: PartialOrd>(data: impl Iterator<Item = T>) -> Option<(usize, T)> {
1142 data.enumerate()
1143 .min_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(Ordering::Equal))
1144}
1145
1146fn get_aggregate_result_out_column(
1148 partition_window_agg_states: &PartitionWindowAggStates,
1149 len_to_show: usize,
1150) -> Result<ArrayRef> {
1151 let mut result = None;
1152 let mut running_length = 0;
1153 for (
1155 _,
1156 WindowState {
1157 state: WindowAggState { out_col, .. },
1158 ..
1159 },
1160 ) in partition_window_agg_states
1161 {
1162 if running_length < len_to_show {
1163 let n_to_use = min(len_to_show - running_length, out_col.len());
1164 let slice_to_use = out_col.slice(0, n_to_use);
1165 result = Some(match result {
1166 Some(arr) => concat(&[&arr, &slice_to_use])?,
1167 None => slice_to_use,
1168 });
1169 running_length += n_to_use;
1170 } else {
1171 break;
1172 }
1173 }
1174 if running_length != len_to_show {
1175 return exec_err!(
1176 "Generated row number should be {len_to_show}, it is {running_length}"
1177 );
1178 }
1179 result
1180 .ok_or_else(|| DataFusionError::Execution("Should contain something".to_string()))
1181}
1182
1183pub(crate) fn get_last_row_batch(batch: &RecordBatch) -> Result<RecordBatch> {
1185 if batch.num_rows() == 0 {
1186 return exec_err!("Latest batch should have at least 1 row");
1187 }
1188 Ok(batch.slice(batch.num_rows() - 1, 1))
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193 use std::pin::Pin;
1194 use std::sync::Arc;
1195 use std::task::{Context, Poll};
1196 use std::time::Duration;
1197
1198 use crate::common::collect;
1199 use crate::expressions::PhysicalSortExpr;
1200 use crate::projection::ProjectionExec;
1201 use crate::streaming::{PartitionStream, StreamingTableExec};
1202 use crate::test::TestMemoryExec;
1203 use crate::windows::{
1204 create_udwf_window_expr, create_window_expr, BoundedWindowAggExec, InputOrderMode,
1205 };
1206 use crate::{execute_stream, get_plan_string, ExecutionPlan};
1207
1208 use arrow::array::{
1209 builder::{Int64Builder, UInt64Builder},
1210 RecordBatch,
1211 };
1212 use arrow::compute::SortOptions;
1213 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1214 use datafusion_common::{
1215 assert_batches_eq, exec_datafusion_err, Result, ScalarValue,
1216 };
1217 use datafusion_execution::config::SessionConfig;
1218 use datafusion_execution::{
1219 RecordBatchStream, SendableRecordBatchStream, TaskContext,
1220 };
1221 use datafusion_expr::{
1222 WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
1223 };
1224 use datafusion_functions_aggregate::count::count_udaf;
1225 use datafusion_functions_window::nth_value::last_value_udwf;
1226 use datafusion_functions_window::nth_value::nth_value_udwf;
1227 use datafusion_physical_expr::expressions::{col, Column, Literal};
1228 use datafusion_physical_expr::window::StandardWindowExpr;
1229 use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
1230
1231 use futures::future::Shared;
1232 use futures::{pin_mut, ready, FutureExt, Stream, StreamExt};
1233 use itertools::Itertools;
1234 use tokio::time::timeout;
1235
1236 #[derive(Debug, Clone)]
1237 struct TestStreamPartition {
1238 schema: SchemaRef,
1239 batches: Vec<RecordBatch>,
1240 idx: usize,
1241 state: PolingState,
1242 sleep_duration: Duration,
1243 send_exit: bool,
1244 }
1245
1246 impl PartitionStream for TestStreamPartition {
1247 fn schema(&self) -> &SchemaRef {
1248 &self.schema
1249 }
1250
1251 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1252 Box::pin(self.clone())
1255 }
1256 }
1257
1258 impl Stream for TestStreamPartition {
1259 type Item = Result<RecordBatch>;
1260
1261 fn poll_next(
1262 mut self: Pin<&mut Self>,
1263 cx: &mut Context<'_>,
1264 ) -> Poll<Option<Self::Item>> {
1265 self.poll_next_inner(cx)
1266 }
1267 }
1268
1269 #[derive(Debug, Clone)]
1270 enum PolingState {
1271 Sleep(Shared<futures::future::BoxFuture<'static, ()>>),
1272 BatchReturn,
1273 }
1274
1275 impl TestStreamPartition {
1276 fn poll_next_inner(
1277 self: &mut Pin<&mut Self>,
1278 cx: &mut Context<'_>,
1279 ) -> Poll<Option<Result<RecordBatch>>> {
1280 loop {
1281 match &mut self.state {
1282 PolingState::BatchReturn => {
1283 let f = tokio::time::sleep(self.sleep_duration).boxed().shared();
1285 self.state = PolingState::Sleep(f);
1286 let input_batch = if let Some(batch) =
1287 self.batches.clone().get(self.idx)
1288 {
1289 batch.clone()
1290 } else if self.send_exit {
1291 return Poll::Ready(None);
1293 } else {
1294 let f =
1296 tokio::time::sleep(self.sleep_duration).boxed().shared();
1297 self.state = PolingState::Sleep(f);
1298 continue;
1299 };
1300 self.idx += 1;
1301 return Poll::Ready(Some(Ok(input_batch)));
1302 }
1303 PolingState::Sleep(future) => {
1304 pin_mut!(future);
1305 ready!(future.poll_unpin(cx));
1306 self.state = PolingState::BatchReturn;
1307 }
1308 }
1309 }
1310 }
1311 }
1312
1313 impl RecordBatchStream for TestStreamPartition {
1314 fn schema(&self) -> SchemaRef {
1315 Arc::clone(&self.schema)
1316 }
1317 }
1318
1319 fn bounded_window_exec_pb_latent_range(
1320 input: Arc<dyn ExecutionPlan>,
1321 n_future_range: usize,
1322 hash: &str,
1323 order_by: &str,
1324 ) -> Result<Arc<dyn ExecutionPlan>> {
1325 let schema = input.schema();
1326 let window_fn = WindowFunctionDefinition::AggregateUDF(count_udaf());
1327 let col_expr =
1328 Arc::new(Column::new(schema.fields[0].name(), 0)) as Arc<dyn PhysicalExpr>;
1329 let args = vec![col_expr];
1330 let partitionby_exprs = vec![col(hash, &schema)?];
1331 let orderby_exprs = LexOrdering::new(vec![PhysicalSortExpr {
1332 expr: col(order_by, &schema)?,
1333 options: SortOptions::default(),
1334 }]);
1335 let window_frame = WindowFrame::new_bounds(
1336 WindowFrameUnits::Range,
1337 WindowFrameBound::CurrentRow,
1338 WindowFrameBound::Following(ScalarValue::UInt64(Some(n_future_range as u64))),
1339 );
1340 let fn_name = format!(
1341 "{}({:?}) PARTITION BY: [{:?}], ORDER BY: [{:?}]",
1342 window_fn, args, partitionby_exprs, orderby_exprs
1343 );
1344 let input_order_mode = InputOrderMode::Linear;
1345 Ok(Arc::new(BoundedWindowAggExec::try_new(
1346 vec![create_window_expr(
1347 &window_fn,
1348 fn_name,
1349 &args,
1350 &partitionby_exprs,
1351 orderby_exprs.as_ref(),
1352 Arc::new(window_frame),
1353 &input.schema(),
1354 false,
1355 )?],
1356 input,
1357 input_order_mode,
1358 true,
1359 )?))
1360 }
1361
1362 fn projection_exec(input: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1363 let schema = input.schema();
1364 let exprs = input
1365 .schema()
1366 .fields
1367 .iter()
1368 .enumerate()
1369 .map(|(idx, field)| {
1370 let name = if field.name().len() > 20 {
1371 format!("col_{idx}")
1372 } else {
1373 field.name().clone()
1374 };
1375 let expr = col(field.name(), &schema).unwrap();
1376 (expr, name)
1377 })
1378 .collect::<Vec<_>>();
1379 Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
1380 }
1381
1382 fn task_context_helper() -> TaskContext {
1383 let task_ctx = TaskContext::default();
1384 let session_config = SessionConfig::new()
1386 .with_batch_size(1)
1387 .with_target_partitions(2)
1388 .with_round_robin_repartition(false);
1389 task_ctx.with_session_config(session_config)
1390 }
1391
1392 fn task_context() -> Arc<TaskContext> {
1393 Arc::new(task_context_helper())
1394 }
1395
1396 pub async fn collect_stream(
1397 mut stream: SendableRecordBatchStream,
1398 results: &mut Vec<RecordBatch>,
1399 ) -> Result<()> {
1400 while let Some(item) = stream.next().await {
1401 results.push(item?);
1402 }
1403 Ok(())
1404 }
1405
1406 pub async fn collect_with_timeout(
1408 plan: Arc<dyn ExecutionPlan>,
1409 context: Arc<TaskContext>,
1410 timeout_duration: Duration,
1411 ) -> Result<Vec<RecordBatch>> {
1412 let stream = execute_stream(plan, context)?;
1413 let mut results = vec![];
1414
1415 if timeout(timeout_duration, collect_stream(stream, &mut results))
1417 .await
1418 .is_ok()
1419 {
1420 return Err(exec_datafusion_err!("shouldn't have completed"));
1421 };
1422
1423 Ok(results)
1424 }
1425
1426 #[allow(dead_code)]
1428 pub async fn collect_bonafide(
1429 plan: Arc<dyn ExecutionPlan>,
1430 context: Arc<TaskContext>,
1431 ) -> Result<Vec<RecordBatch>> {
1432 let stream = execute_stream(plan, context)?;
1433 let mut results = vec![];
1434
1435 collect_stream(stream, &mut results).await?;
1436
1437 Ok(results)
1438 }
1439
1440 fn test_schema() -> SchemaRef {
1441 Arc::new(Schema::new(vec![
1442 Field::new("sn", DataType::UInt64, true),
1443 Field::new("hash", DataType::Int64, true),
1444 ]))
1445 }
1446
1447 fn schema_orders(schema: &SchemaRef) -> Result<Vec<LexOrdering>> {
1448 let orderings = vec![LexOrdering::new(vec![PhysicalSortExpr {
1449 expr: col("sn", schema)?,
1450 options: SortOptions {
1451 descending: false,
1452 nulls_first: false,
1453 },
1454 }])];
1455 Ok(orderings)
1456 }
1457
1458 fn is_integer_division_safe(lhs: usize, rhs: usize) -> bool {
1459 let res = lhs / rhs;
1460 res * rhs == lhs
1461 }
1462 fn generate_batches(
1463 schema: &SchemaRef,
1464 n_row: usize,
1465 n_chunk: usize,
1466 ) -> Result<Vec<RecordBatch>> {
1467 let mut batches = vec![];
1468 assert!(n_row > 0);
1469 assert!(n_chunk > 0);
1470 assert!(is_integer_division_safe(n_row, n_chunk));
1471 let hash_replicate = 4;
1472
1473 let chunks = (0..n_row)
1474 .chunks(n_chunk)
1475 .into_iter()
1476 .map(|elem| elem.into_iter().collect::<Vec<_>>())
1477 .collect::<Vec<_>>();
1478
1479 for sn_values in chunks {
1481 let mut sn1_array = UInt64Builder::with_capacity(sn_values.len());
1482 let mut hash_array = Int64Builder::with_capacity(sn_values.len());
1483
1484 for sn in sn_values {
1485 sn1_array.append_value(sn as u64);
1486 let hash_value = (2 - (sn / hash_replicate)) as i64;
1487 hash_array.append_value(hash_value);
1488 }
1489
1490 let batch = RecordBatch::try_new(
1491 Arc::clone(schema),
1492 vec![Arc::new(sn1_array.finish()), Arc::new(hash_array.finish())],
1493 )?;
1494 batches.push(batch);
1495 }
1496 Ok(batches)
1497 }
1498
1499 fn generate_never_ending_source(
1500 n_rows: usize,
1501 chunk_length: usize,
1502 n_partition: usize,
1503 is_infinite: bool,
1504 send_exit: bool,
1505 per_batch_wait_duration_in_millis: u64,
1506 ) -> Result<Arc<dyn ExecutionPlan>> {
1507 assert!(n_partition > 0);
1508
1509 let schema = test_schema();
1513 let orderings = schema_orders(&schema)?;
1514
1515 let per_batch_wait_duration =
1517 Duration::from_millis(per_batch_wait_duration_in_millis);
1518
1519 let batches = generate_batches(&schema, n_rows, chunk_length)?;
1520
1521 let partitions = vec![
1523 Arc::new(TestStreamPartition {
1524 schema: Arc::clone(&schema),
1525 batches,
1526 idx: 0,
1527 state: PolingState::BatchReturn,
1528 sleep_duration: per_batch_wait_duration,
1529 send_exit,
1530 }) as _;
1531 n_partition
1532 ];
1533 let source = Arc::new(StreamingTableExec::try_new(
1534 Arc::clone(&schema),
1535 partitions,
1536 None,
1537 orderings,
1538 is_infinite,
1539 None,
1540 )?) as _;
1541 Ok(source)
1542 }
1543
1544 #[tokio::test]
1550 async fn test_window_nth_value_bounded_memoize() -> Result<()> {
1551 let config = SessionConfig::new().with_target_partitions(1);
1552 let task_ctx = Arc::new(TaskContext::default().with_session_config(config));
1553
1554 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1555 let batch = RecordBatch::try_new(
1557 Arc::clone(&schema),
1558 vec![Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3]))],
1559 )?;
1560
1561 let memory_exec = TestMemoryExec::try_new_exec(
1562 &[vec![batch.clone(), batch.clone(), batch.clone()]],
1563 Arc::clone(&schema),
1564 None,
1565 )?;
1566 let col_a = col("a", &schema)?;
1567 let nth_value_func1 = create_udwf_window_expr(
1568 &nth_value_udwf(),
1569 &[
1570 Arc::clone(&col_a),
1571 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1572 ],
1573 &schema,
1574 "nth_value(-1)".to_string(),
1575 false,
1576 )?
1577 .reverse_expr()
1578 .unwrap();
1579 let nth_value_func2 = create_udwf_window_expr(
1580 &nth_value_udwf(),
1581 &[
1582 Arc::clone(&col_a),
1583 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
1584 ],
1585 &schema,
1586 "nth_value(-2)".to_string(),
1587 false,
1588 )?
1589 .reverse_expr()
1590 .unwrap();
1591
1592 let last_value_func = create_udwf_window_expr(
1593 &last_value_udwf(),
1594 &[Arc::clone(&col_a)],
1595 &schema,
1596 "last".to_string(),
1597 false,
1598 )?;
1599
1600 let window_exprs = vec![
1601 Arc::new(StandardWindowExpr::new(
1603 last_value_func,
1604 &[],
1605 &LexOrdering::default(),
1606 Arc::new(WindowFrame::new_bounds(
1607 WindowFrameUnits::Rows,
1608 WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
1609 WindowFrameBound::CurrentRow,
1610 )),
1611 )) as _,
1612 Arc::new(StandardWindowExpr::new(
1614 nth_value_func1,
1615 &[],
1616 &LexOrdering::default(),
1617 Arc::new(WindowFrame::new_bounds(
1618 WindowFrameUnits::Rows,
1619 WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
1620 WindowFrameBound::CurrentRow,
1621 )),
1622 )) as _,
1623 Arc::new(StandardWindowExpr::new(
1625 nth_value_func2,
1626 &[],
1627 &LexOrdering::default(),
1628 Arc::new(WindowFrame::new_bounds(
1629 WindowFrameUnits::Rows,
1630 WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
1631 WindowFrameBound::CurrentRow,
1632 )),
1633 )) as _,
1634 ];
1635 let physical_plan = BoundedWindowAggExec::try_new(
1636 window_exprs,
1637 memory_exec,
1638 InputOrderMode::Sorted,
1639 true,
1640 )
1641 .map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;
1642
1643 let batches = collect(physical_plan.execute(0, task_ctx)?).await?;
1644
1645 let expected = vec![
1646 "BoundedWindowAggExec: wdw=[last: Ok(Field { name: \"last\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }, nth_value(-1): Ok(Field { name: \"nth_value(-1)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }, nth_value(-2): Ok(Field { name: \"nth_value(-2)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }], mode=[Sorted]",
1647 " DataSourceExec: partitions=1, partition_sizes=[3]",
1648 ];
1649 let actual = get_plan_string(&physical_plan);
1651 assert_eq!(
1652 expected, actual,
1653 "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
1654 );
1655
1656 let expected = [
1657 "+---+------+---------------+---------------+",
1658 "| a | last | nth_value(-1) | nth_value(-2) |",
1659 "+---+------+---------------+---------------+",
1660 "| 1 | 1 | 1 | |",
1661 "| 2 | 2 | 2 | 1 |",
1662 "| 3 | 3 | 3 | 2 |",
1663 "| 1 | 1 | 1 | 3 |",
1664 "| 2 | 2 | 2 | 1 |",
1665 "| 3 | 3 | 3 | 2 |",
1666 "| 1 | 1 | 1 | 3 |",
1667 "| 2 | 2 | 2 | 1 |",
1668 "| 3 | 3 | 3 | 2 |",
1669 "+---+------+---------------+---------------+",
1670 ];
1671 assert_batches_eq!(expected, &batches);
1672 Ok(())
1673 }
1674
1675 #[tokio::test]
1753 async fn bounded_window_exec_linear_mode_range_information() -> Result<()> {
1754 let n_rows = 10;
1755 let chunk_length = 2;
1756 let n_future_range = 1;
1757
1758 let timeout_duration = Duration::from_millis(2000);
1759
1760 let source =
1761 generate_never_ending_source(n_rows, chunk_length, 1, true, false, 5)?;
1762
1763 let window =
1764 bounded_window_exec_pb_latent_range(source, n_future_range, "hash", "sn")?;
1765
1766 let plan = projection_exec(window)?;
1767
1768 let expected_plan = vec![
1769 "ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]@2 as col_2]",
1770 " BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]: Ok(Field { name: \"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"hash\\\", index: 1 }]], ORDER BY: [LexOrdering { inner: [PhysicalSortExpr { expr: Column { name: \\\"sn\\\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }] }]\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(1)), is_causal: false }], mode=[Linear]",
1771 " StreamingTableExec: partition_sizes=1, projection=[sn, hash], infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]",
1772 ];
1773
1774 let actual = get_plan_string(&plan);
1776 assert_eq!(
1777 expected_plan, actual,
1778 "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_plan:#?}\nactual:\n\n{actual:#?}\n\n"
1779 );
1780
1781 let task_ctx = task_context();
1782 let batches = collect_with_timeout(plan, task_ctx, timeout_duration).await?;
1783
1784 let expected = [
1785 "+----+------+-------+",
1786 "| sn | hash | col_2 |",
1787 "+----+------+-------+",
1788 "| 0 | 2 | 2 |",
1789 "| 1 | 2 | 2 |",
1790 "| 2 | 2 | 2 |",
1791 "| 3 | 2 | 1 |",
1792 "| 4 | 1 | 2 |",
1793 "| 5 | 1 | 2 |",
1794 "| 6 | 1 | 2 |",
1795 "| 7 | 1 | 1 |",
1796 "+----+------+-------+",
1797 ];
1798 assert_batches_eq!(expected, &batches);
1799
1800 Ok(())
1801 }
1802}