1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::compute::kernels::partition::lexicographical_partition_ranges;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
/// A window expression that:
/// * knows its resulting field
pub trait WindowExpr: Send + Sync + Debug {
/// Returns the window expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// the field of the final result of this window function.
fn field(&self) -> Result<Field>;
/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
"WindowExpr: default name"
}
/// expressions that are passed to the WindowAccumulator.
/// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`],
/// others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
/// evaluate the window function arguments against the batch and return
/// array ref, normally the resulting vec is a single element one.
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect()
}
/// evaluate the window function values against the batch
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
/// evaluate the partition points given the sort columns; if the sort columns are
/// empty then the result will be a single element vec of the whole column rows.
fn evaluate_partition_points(
&self,
num_rows: usize,
partition_columns: &[SortColumn],
) -> Result<Vec<Range<usize>>> {
if partition_columns.is_empty() {
Ok(vec![Range {
start: 0,
end: num_rows,
}])
} else {
Ok(lexicographical_partition_ranges(partition_columns)
.map_err(DataFusionError::ArrowError)?
.collect::<Vec<_>>())
}
}
/// expressions that's from the window function's partition by clause, empty if absent
fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
/// expressions that's from the window function's order by clause, empty if absent
fn order_by(&self) -> &[PhysicalSortExpr];
/// get partition columns that can be used for partitioning, empty if absent
fn partition_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
self.partition_by()
.iter()
.map(|expr| {
PhysicalSortExpr {
expr: expr.clone(),
options: SortOptions::default(),
}
.evaluate_to_sort_column(batch)
})
.collect()
}
/// get sort columns that can be used for peer evaluation, empty if absent
fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
let mut sort_columns = self.partition_columns(batch)?;
let order_by_columns = self
.order_by()
.iter()
.map(|e| e.evaluate_to_sort_column(batch))
.collect::<Result<Vec<SortColumn>>>()?;
sort_columns.extend(order_by_columns);
Ok(sort_columns)
}
}