datafusion_physical_plan/windows/
window_agg_exec.rsuse std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use super::utils::create_schema;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
window_equivalence_properties,
};
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionMode,
ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
};
use arrow::array::ArrayRef;
use arrow::compute::{concat, concat_batches};
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::utils::{evaluate_partition_ranges, transpose};
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use futures::{ready, Stream, StreamExt};
#[derive(Debug, Clone)]
pub struct WindowAggExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
window_expr: Vec<Arc<dyn WindowExpr>>,
schema: SchemaRef,
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
metrics: ExecutionPlanMetricsSet,
ordered_partition_by_indices: Vec<usize>,
cache: PlanProperties,
}
impl WindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);
let ordered_partition_by_indices =
get_ordered_partition_by_indices(window_expr[0].partition_by(), &input);
let cache = Self::compute_properties(Arc::clone(&schema), &input, &window_expr);
Ok(Self {
input,
window_expr,
schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
ordered_partition_by_indices,
cache,
})
}
pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
&self.window_expr
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
pub fn partition_by_sort_keys(&self) -> Result<LexOrdering> {
let partition_by = self.window_expr()[0].partition_by();
get_partition_by_sort_exprs(
&self.input,
partition_by,
&self.ordered_partition_by_indices,
)
}
fn compute_properties(
schema: SchemaRef,
input: &Arc<dyn ExecutionPlan>,
window_expr: &[Arc<dyn WindowExpr>],
) -> PlanProperties {
let eq_properties = window_equivalence_properties(&schema, input, window_expr);
let output_partitioning = input.output_partitioning().clone();
let mode = match input.execution_mode() {
ExecutionMode::Bounded => ExecutionMode::Bounded,
ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
ExecutionMode::PipelineBreaking
}
};
PlanProperties::new(eq_properties, output_partitioning, mode)
}
}
impl DisplayAs for WindowAggExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "WindowAggExec: ")?;
let g: Vec<String> = self
.window_expr
.iter()
.map(|e| {
format!(
"{}: {:?}, frame: {:?}",
e.name().to_owned(),
e.field(),
e.get_window_frame()
)
})
.collect();
write!(f, "wdw=[{}]", g.join(", "))?;
}
}
Ok(())
}
}
impl ExecutionPlan for WindowAggExec {
fn name(&self) -> &'static str {
"WindowAggExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
let partition_bys = self.window_expr()[0].partition_by();
let order_keys = self.window_expr()[0].order_by();
if self.ordered_partition_by_indices.len() < partition_bys.len() {
vec![calc_requirements(partition_bys, order_keys.iter())]
} else {
let partition_bys = self
.ordered_partition_by_indices
.iter()
.map(|idx| &partition_bys[*idx]);
vec![calc_requirements(partition_bys, order_keys.iter())]
}
}
fn required_input_distribution(&self) -> Vec<Distribution> {
if self.partition_keys.is_empty() {
vec![Distribution::SinglePartition]
} else {
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
}
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
Arc::clone(&children[0]),
self.partition_keys.clone(),
)?))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
let stream = Box::pin(WindowAggStream::new(
Arc::clone(&self.schema),
self.window_expr.clone(),
input,
BaselineMetrics::new(&self.metrics, partition),
self.partition_by_sort_keys()?,
self.ordered_partition_by_indices.clone(),
)?);
Ok(stream)
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
let input_stat = self.input.statistics()?;
let win_cols = self.window_expr.len();
let input_cols = self.input.schema().fields().len();
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
column_statistics.extend(input_stat.column_statistics);
for _ in 0..win_cols {
column_statistics.push(ColumnStatistics::new_unknown())
}
Ok(Statistics {
num_rows: input_stat.num_rows,
column_statistics,
total_byte_size: Precision::Absent,
})
}
}
fn compute_window_aggregates(
window_expr: &[Arc<dyn WindowExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
window_expr
.iter()
.map(|window_expr| window_expr.evaluate(batch))
.collect()
}
pub struct WindowAggStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
batches: Vec<RecordBatch>,
finished: bool,
window_expr: Vec<Arc<dyn WindowExpr>>,
partition_by_sort_keys: LexOrdering,
baseline_metrics: BaselineMetrics,
ordered_partition_by_indices: Vec<usize>,
}
impl WindowAggStream {
pub fn new(
schema: SchemaRef,
window_expr: Vec<Arc<dyn WindowExpr>>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
partition_by_sort_keys: LexOrdering,
ordered_partition_by_indices: Vec<usize>,
) -> Result<Self> {
if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() {
return internal_err!("All partition by columns should have an ordering");
}
Ok(Self {
schema,
input,
batches: vec![],
finished: false,
window_expr,
baseline_metrics,
partition_by_sort_keys,
ordered_partition_by_indices,
})
}
fn compute_aggregates(&self) -> Result<RecordBatch> {
let _timer = self.baseline_metrics.elapsed_compute().timer();
let batch = concat_batches(&self.input.schema(), &self.batches)?;
if batch.num_rows() == 0 {
return Ok(RecordBatch::new_empty(Arc::clone(&self.schema)));
}
let partition_by_sort_keys = self
.ordered_partition_by_indices
.iter()
.map(|idx| self.partition_by_sort_keys[*idx].evaluate_to_sort_column(&batch))
.collect::<Result<Vec<_>>>()?;
let partition_points =
evaluate_partition_ranges(batch.num_rows(), &partition_by_sort_keys)?;
let mut partition_results = vec![];
for partition_point in partition_points {
let length = partition_point.end - partition_point.start;
partition_results.push(compute_window_aggregates(
&self.window_expr,
&batch.slice(partition_point.start, length),
)?)
}
let columns = transpose(partition_results)
.iter()
.map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<ArrayRef>, ArrowError>>()?;
let mut batch_columns = batch.columns().to_vec();
batch_columns.extend_from_slice(&columns);
Ok(RecordBatch::try_new(
Arc::clone(&self.schema),
batch_columns,
)?)
}
}
impl Stream for WindowAggStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}
}
impl WindowAggStream {
#[inline]
fn poll_next_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.finished {
return Poll::Ready(None);
}
loop {
let result = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.batches.push(batch);
continue;
}
Some(Err(e)) => Err(e),
None => self.compute_aggregates(),
};
self.finished = true;
return Poll::Ready(Some(result));
}
}
}
impl RecordBatchStream for WindowAggStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}