datafusion_physical_plan/windows/
window_agg_exec.rs1use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::utils::create_schema;
26use crate::execution_plan::EmissionType;
27use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
28use crate::windows::{
29 calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
30 window_equivalence_properties,
31};
32use crate::{
33 ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
34 ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
35 SendableRecordBatchStream, Statistics, WindowExpr,
36};
37
38use arrow::array::ArrayRef;
39use arrow::compute::{concat, concat_batches};
40use arrow::datatypes::SchemaRef;
41use arrow::error::ArrowError;
42use arrow::record_batch::RecordBatch;
43use datafusion_common::stats::Precision;
44use datafusion_common::utils::{evaluate_partition_ranges, transpose};
45use datafusion_common::{internal_err, Result};
46use datafusion_execution::TaskContext;
47use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
48
49use futures::{ready, Stream, StreamExt};
50
51#[derive(Debug, Clone)]
53pub struct WindowAggExec {
54 pub(crate) input: Arc<dyn ExecutionPlan>,
56 window_expr: Vec<Arc<dyn WindowExpr>>,
58 schema: SchemaRef,
60 metrics: ExecutionPlanMetricsSet,
62 ordered_partition_by_indices: Vec<usize>,
65 cache: PlanProperties,
67 can_repartition: bool,
69}
70
71impl WindowAggExec {
72 pub fn try_new(
74 window_expr: Vec<Arc<dyn WindowExpr>>,
75 input: Arc<dyn ExecutionPlan>,
76 can_repartition: bool,
77 ) -> Result<Self> {
78 let schema = create_schema(&input.schema(), &window_expr)?;
79 let schema = Arc::new(schema);
80
81 let ordered_partition_by_indices =
82 get_ordered_partition_by_indices(window_expr[0].partition_by(), &input);
83 let cache = Self::compute_properties(Arc::clone(&schema), &input, &window_expr);
84 Ok(Self {
85 input,
86 window_expr,
87 schema,
88 metrics: ExecutionPlanMetricsSet::new(),
89 ordered_partition_by_indices,
90 cache,
91 can_repartition,
92 })
93 }
94
95 pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
97 &self.window_expr
98 }
99
100 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
102 &self.input
103 }
104
105 pub fn partition_by_sort_keys(&self) -> Result<LexOrdering> {
111 let partition_by = self.window_expr()[0].partition_by();
112 get_partition_by_sort_exprs(
113 &self.input,
114 partition_by,
115 &self.ordered_partition_by_indices,
116 )
117 }
118
119 fn compute_properties(
121 schema: SchemaRef,
122 input: &Arc<dyn ExecutionPlan>,
123 window_exprs: &[Arc<dyn WindowExpr>],
124 ) -> PlanProperties {
125 let eq_properties = window_equivalence_properties(&schema, input, window_exprs);
127
128 let output_partitioning = input.output_partitioning().clone();
132
133 PlanProperties::new(
135 eq_properties,
136 output_partitioning,
137 EmissionType::Final,
139 input.boundedness(),
140 )
141 }
142
143 pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
144 if !self.can_repartition {
145 vec![]
146 } else {
147 let all_partition_keys = self
148 .window_expr()
149 .iter()
150 .map(|expr| expr.partition_by().to_vec())
151 .collect::<Vec<_>>();
152
153 all_partition_keys
154 .into_iter()
155 .min_by_key(|s| s.len())
156 .unwrap_or_else(Vec::new)
157 }
158 }
159}
160
161impl DisplayAs for WindowAggExec {
162 fn fmt_as(
163 &self,
164 t: DisplayFormatType,
165 f: &mut std::fmt::Formatter,
166 ) -> std::fmt::Result {
167 match t {
168 DisplayFormatType::Default | DisplayFormatType::Verbose => {
169 write!(f, "WindowAggExec: ")?;
170 let g: Vec<String> = self
171 .window_expr
172 .iter()
173 .map(|e| {
174 format!(
175 "{}: {:?}, frame: {:?}",
176 e.name().to_owned(),
177 e.field(),
178 e.get_window_frame()
179 )
180 })
181 .collect();
182 write!(f, "wdw=[{}]", g.join(", "))?;
183 }
184 }
185 Ok(())
186 }
187}
188
189impl ExecutionPlan for WindowAggExec {
190 fn name(&self) -> &'static str {
191 "WindowAggExec"
192 }
193
194 fn as_any(&self) -> &dyn Any {
196 self
197 }
198
199 fn properties(&self) -> &PlanProperties {
200 &self.cache
201 }
202
203 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
204 vec![&self.input]
205 }
206
207 fn maintains_input_order(&self) -> Vec<bool> {
208 vec![true]
209 }
210
211 fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
212 let partition_bys = self.window_expr()[0].partition_by();
213 let order_keys = self.window_expr()[0].order_by();
214 if self.ordered_partition_by_indices.len() < partition_bys.len() {
215 vec![calc_requirements(partition_bys, order_keys.iter())]
216 } else {
217 let partition_bys = self
218 .ordered_partition_by_indices
219 .iter()
220 .map(|idx| &partition_bys[*idx]);
221 vec![calc_requirements(partition_bys, order_keys.iter())]
222 }
223 }
224
225 fn required_input_distribution(&self) -> Vec<Distribution> {
226 if self.partition_keys().is_empty() {
227 vec![Distribution::SinglePartition]
228 } else {
229 vec![Distribution::HashPartitioned(self.partition_keys())]
230 }
231 }
232
233 fn with_new_children(
234 self: Arc<Self>,
235 children: Vec<Arc<dyn ExecutionPlan>>,
236 ) -> Result<Arc<dyn ExecutionPlan>> {
237 Ok(Arc::new(WindowAggExec::try_new(
238 self.window_expr.clone(),
239 Arc::clone(&children[0]),
240 true,
241 )?))
242 }
243
244 fn execute(
245 &self,
246 partition: usize,
247 context: Arc<TaskContext>,
248 ) -> Result<SendableRecordBatchStream> {
249 let input = self.input.execute(partition, context)?;
250 let stream = Box::pin(WindowAggStream::new(
251 Arc::clone(&self.schema),
252 self.window_expr.clone(),
253 input,
254 BaselineMetrics::new(&self.metrics, partition),
255 self.partition_by_sort_keys()?,
256 self.ordered_partition_by_indices.clone(),
257 )?);
258 Ok(stream)
259 }
260
261 fn metrics(&self) -> Option<MetricsSet> {
262 Some(self.metrics.clone_inner())
263 }
264
265 fn statistics(&self) -> Result<Statistics> {
266 let input_stat = self.input.statistics()?;
267 let win_cols = self.window_expr.len();
268 let input_cols = self.input.schema().fields().len();
269 let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
271 column_statistics.extend(input_stat.column_statistics);
273 for _ in 0..win_cols {
274 column_statistics.push(ColumnStatistics::new_unknown())
275 }
276 Ok(Statistics {
277 num_rows: input_stat.num_rows,
278 column_statistics,
279 total_byte_size: Precision::Absent,
280 })
281 }
282}
283
284fn compute_window_aggregates(
286 window_expr: &[Arc<dyn WindowExpr>],
287 batch: &RecordBatch,
288) -> Result<Vec<ArrayRef>> {
289 window_expr
290 .iter()
291 .map(|window_expr| window_expr.evaluate(batch))
292 .collect()
293}
294
295pub struct WindowAggStream {
297 schema: SchemaRef,
298 input: SendableRecordBatchStream,
299 batches: Vec<RecordBatch>,
300 finished: bool,
301 window_expr: Vec<Arc<dyn WindowExpr>>,
302 partition_by_sort_keys: LexOrdering,
303 baseline_metrics: BaselineMetrics,
304 ordered_partition_by_indices: Vec<usize>,
305}
306
307impl WindowAggStream {
308 pub fn new(
310 schema: SchemaRef,
311 window_expr: Vec<Arc<dyn WindowExpr>>,
312 input: SendableRecordBatchStream,
313 baseline_metrics: BaselineMetrics,
314 partition_by_sort_keys: LexOrdering,
315 ordered_partition_by_indices: Vec<usize>,
316 ) -> Result<Self> {
317 if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() {
319 return internal_err!("All partition by columns should have an ordering");
320 }
321 Ok(Self {
322 schema,
323 input,
324 batches: vec![],
325 finished: false,
326 window_expr,
327 baseline_metrics,
328 partition_by_sort_keys,
329 ordered_partition_by_indices,
330 })
331 }
332
333 fn compute_aggregates(&self) -> Result<Option<RecordBatch>> {
334 let _timer = self.baseline_metrics.elapsed_compute().timer();
336
337 let batch = concat_batches(&self.input.schema(), &self.batches)?;
338 if batch.num_rows() == 0 {
339 return Ok(None);
340 }
341
342 let partition_by_sort_keys = self
343 .ordered_partition_by_indices
344 .iter()
345 .map(|idx| self.partition_by_sort_keys[*idx].evaluate_to_sort_column(&batch))
346 .collect::<Result<Vec<_>>>()?;
347 let partition_points =
348 evaluate_partition_ranges(batch.num_rows(), &partition_by_sort_keys)?;
349
350 let mut partition_results = vec![];
351 for partition_point in partition_points {
353 let length = partition_point.end - partition_point.start;
354 partition_results.push(compute_window_aggregates(
355 &self.window_expr,
356 &batch.slice(partition_point.start, length),
357 )?)
358 }
359 let columns = transpose(partition_results)
360 .iter()
361 .map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
362 .collect::<Vec<_>>()
363 .into_iter()
364 .collect::<Result<Vec<ArrayRef>, ArrowError>>()?;
365
366 let mut batch_columns = batch.columns().to_vec();
370 batch_columns.extend_from_slice(&columns);
372 Ok(Some(RecordBatch::try_new(
373 Arc::clone(&self.schema),
374 batch_columns,
375 )?))
376 }
377}
378
379impl Stream for WindowAggStream {
380 type Item = Result<RecordBatch>;
381
382 fn poll_next(
383 mut self: Pin<&mut Self>,
384 cx: &mut Context<'_>,
385 ) -> Poll<Option<Self::Item>> {
386 let poll = self.poll_next_inner(cx);
387 self.baseline_metrics.record_poll(poll)
388 }
389}
390
391impl WindowAggStream {
392 #[inline]
393 fn poll_next_inner(
394 &mut self,
395 cx: &mut Context<'_>,
396 ) -> Poll<Option<Result<RecordBatch>>> {
397 if self.finished {
398 return Poll::Ready(None);
399 }
400
401 loop {
402 return Poll::Ready(Some(match ready!(self.input.poll_next_unpin(cx)) {
403 Some(Ok(batch)) => {
404 self.batches.push(batch);
405 continue;
406 }
407 Some(Err(e)) => Err(e),
408 None => {
409 let Some(result) = self.compute_aggregates()? else {
410 return Poll::Ready(None);
411 };
412 self.finished = true;
413 debug_assert!(result.num_rows() > 0);
416 Ok(result)
417 }
418 }));
419 }
420 }
421}
422
423impl RecordBatchStream for WindowAggStream {
424 fn schema(&self) -> SchemaRef {
426 Arc::clone(&self.schema)
427 }
428}