use std::borrow::Borrow;
use std::sync::Arc;
use crate::{
expressions::{Literal, NthValue, PhysicalSortExpr},
ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr,
};
use arrow::datatypes::Schema;
use arrow_schema::{DataType, Field, SchemaRef};
use datafusion_common::{exec_datafusion_err, exec_err, Result, ScalarValue};
use datafusion_expr::{
BuiltInWindowFunction, PartitionEvaluator, ReversedUDWF, WindowFrame,
WindowFunctionDefinition, WindowUDF,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
reverse_order_bys,
window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr},
ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement,
};
use itertools::Itertools;
mod bounded_window_agg_exec;
mod utils;
mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
use datafusion_functions_window_common::expr::ExpressionArgs;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, LexRequirement};
pub use window_agg_exec::WindowAggExec;
pub fn schema_add_window_field(
args: &[Arc<dyn PhysicalExpr>],
schema: &Schema,
window_fn: &WindowFunctionDefinition,
fn_name: &str,
) -> Result<Arc<Schema>> {
let data_types = args
.iter()
.map(|e| Arc::clone(e).as_ref().data_type(schema))
.collect::<Result<Vec<_>>>()?;
let nullability = args
.iter()
.map(|e| Arc::clone(e).as_ref().nullable(schema))
.collect::<Result<Vec<_>>>()?;
let window_expr_return_type =
window_fn.return_type(&data_types, &nullability, fn_name)?;
let mut window_fields = schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect_vec();
if let WindowFunctionDefinition::AggregateUDF(_) = window_fn {
Ok(Arc::new(Schema::new(window_fields)))
} else {
window_fields.extend_from_slice(&[Field::new(
fn_name,
window_expr_return_type,
false,
)]);
Ok(Arc::new(Schema::new(window_fields)))
}
}
#[allow(clippy::too_many_arguments)]
pub fn create_window_expr(
fun: &WindowFunctionDefinition,
name: String,
args: &[Arc<dyn PhysicalExpr>],
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: LexOrderingRef,
window_frame: Arc<WindowFrame>,
input_schema: &Schema,
ignore_nulls: bool,
) -> Result<Arc<dyn WindowExpr>> {
Ok(match fun {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => {
Arc::new(BuiltInWindowExpr::new(
create_built_in_window_expr(fun, args, input_schema, name, ignore_nulls)?,
partition_by,
order_by,
window_frame,
))
}
WindowFunctionDefinition::AggregateUDF(fun) => {
let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
.schema(Arc::new(input_schema.clone()))
.alias(name)
.with_ignore_nulls(ignore_nulls)
.build()
.map(Arc::new)?;
window_expr_from_aggregate_expr(
partition_by,
order_by,
window_frame,
aggregate,
)
}
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new(
create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?,
partition_by,
order_by,
window_frame,
)),
})
}
fn window_expr_from_aggregate_expr(
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: LexOrderingRef,
window_frame: Arc<WindowFrame>,
aggregate: Arc<AggregateFunctionExpr>,
) -> Arc<dyn WindowExpr> {
let unbounded_window = window_frame.start_bound.is_unbounded();
if !unbounded_window {
Arc::new(SlidingAggregateWindowExpr::new(
aggregate,
partition_by,
order_by,
window_frame,
))
} else {
Arc::new(PlainAggregateWindowExpr::new(
aggregate,
partition_by,
order_by,
window_frame,
))
}
}
fn get_signed_integer(value: ScalarValue) -> Result<i64> {
if value.is_null() {
return Ok(0);
}
if !value.data_type().is_integer() {
return exec_err!("Expected an integer value");
}
value.cast_to(&DataType::Int64)?.try_into()
}
fn create_built_in_window_expr(
fun: &BuiltInWindowFunction,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
let out_data_type: &DataType = input_schema.field_with_name(&name)?.data_type();
Ok(match fun {
BuiltInWindowFunction::NthValue => {
let arg = Arc::clone(&args[0]);
let n = get_signed_integer(
args[1]
.as_any()
.downcast_ref::<Literal>()
.ok_or_else(|| {
exec_datafusion_err!("Expected a signed integer literal for the second argument of nth_value, got {}", args[1])
})?
.value()
.clone(),
)?;
Arc::new(NthValue::nth(
name,
arg,
out_data_type.clone(),
n,
ignore_nulls,
)?)
}
BuiltInWindowFunction::FirstValue => {
let arg = Arc::clone(&args[0]);
Arc::new(NthValue::first(
name,
arg,
out_data_type.clone(),
ignore_nulls,
))
}
BuiltInWindowFunction::LastValue => {
let arg = Arc::clone(&args[0]);
Arc::new(NthValue::last(
name,
arg,
out_data_type.clone(),
ignore_nulls,
))
}
})
}
fn create_udwf_window_expr(
fun: &Arc<WindowUDF>,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
let input_types: Vec<_> = args
.iter()
.map(|arg| arg.data_type(input_schema))
.collect::<Result<_>>()?;
Ok(Arc::new(WindowUDFExpr {
fun: Arc::clone(fun),
args: args.to_vec(),
input_types,
name,
is_reversed: false,
ignore_nulls,
}))
}
#[derive(Clone, Debug)]
struct WindowUDFExpr {
fun: Arc<WindowUDF>,
args: Vec<Arc<dyn PhysicalExpr>>,
name: String,
input_types: Vec<DataType>,
is_reversed: bool,
ignore_nulls: bool,
}
impl BuiltInWindowFunctionExpr for WindowUDFExpr {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn field(&self) -> Result<Field> {
self.fun
.field(WindowUDFFieldArgs::new(&self.input_types, &self.name))
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.fun
.expressions(ExpressionArgs::new(&self.args, &self.input_types))
}
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.fun
.partition_evaluator_factory(PartitionEvaluatorArgs::new(
&self.args,
&self.input_types,
self.is_reversed,
self.ignore_nulls,
))
}
fn name(&self) -> &str {
&self.name
}
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
match self.fun.reverse_expr() {
ReversedUDWF::Identical => Some(Arc::new(self.clone())),
ReversedUDWF::NotSupported => None,
ReversedUDWF::Reversed(fun) => Some(Arc::new(WindowUDFExpr {
fun,
args: self.args.clone(),
name: self.name.clone(),
input_types: self.input_types.clone(),
is_reversed: !self.is_reversed,
ignore_nulls: self.ignore_nulls,
})),
}
}
fn get_result_ordering(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
self.fun
.sort_options()
.zip(schema.column_with_name(self.name()))
.map(|(options, (idx, field))| {
let expr = Arc::new(Column::new(field.name(), idx));
PhysicalSortExpr { expr, options }
})
}
}
pub(crate) fn calc_requirements<
T: Borrow<Arc<dyn PhysicalExpr>>,
S: Borrow<PhysicalSortExpr>,
>(
partition_by_exprs: impl IntoIterator<Item = T>,
orderby_sort_exprs: impl IntoIterator<Item = S>,
) -> Option<LexRequirement> {
let mut sort_reqs = LexRequirement::new(
partition_by_exprs
.into_iter()
.map(|partition_by| {
PhysicalSortRequirement::new(Arc::clone(partition_by.borrow()), None)
})
.collect::<Vec<_>>(),
);
for element in orderby_sort_exprs.into_iter() {
let PhysicalSortExpr { expr, options } = element.borrow();
if !sort_reqs.iter().any(|e| e.expr.eq(expr)) {
sort_reqs.push(PhysicalSortRequirement::new(
Arc::clone(expr),
Some(*options),
));
}
}
(!sort_reqs.is_empty()).then_some(sort_reqs)
}
pub fn get_ordered_partition_by_indices(
partition_by_exprs: &[Arc<dyn PhysicalExpr>],
input: &Arc<dyn ExecutionPlan>,
) -> Vec<usize> {
let (_, indices) = input
.equivalence_properties()
.find_longest_permutation(partition_by_exprs);
indices
}
pub(crate) fn get_partition_by_sort_exprs(
input: &Arc<dyn ExecutionPlan>,
partition_by_exprs: &[Arc<dyn PhysicalExpr>],
ordered_partition_by_indices: &[usize],
) -> Result<LexOrdering> {
let ordered_partition_exprs = ordered_partition_by_indices
.iter()
.map(|idx| Arc::clone(&partition_by_exprs[*idx]))
.collect::<Vec<_>>();
assert!(ordered_partition_by_indices.len() <= partition_by_exprs.len());
let (ordering, _) = input
.equivalence_properties()
.find_longest_permutation(&ordered_partition_exprs);
if ordering.len() == ordered_partition_exprs.len() {
Ok(ordering)
} else {
exec_err!("Expects PARTITION BY expression to be ordered")
}
}
pub(crate) fn window_equivalence_properties(
schema: &SchemaRef,
input: &Arc<dyn ExecutionPlan>,
window_expr: &[Arc<dyn WindowExpr>],
) -> EquivalenceProperties {
let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
.extend(input.equivalence_properties().clone());
for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
{
builtin_window_expr.add_equal_orderings(&mut window_eq_properties);
}
}
window_eq_properties
}
pub fn get_best_fitting_window(
window_exprs: &[Arc<dyn WindowExpr>],
input: &Arc<dyn ExecutionPlan>,
physical_partition_keys: &[Arc<dyn PhysicalExpr>],
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let partitionby_exprs = window_exprs[0].partition_by();
let orderby_keys = window_exprs[0].order_by();
let (should_reverse, input_order_mode) =
if let Some((should_reverse, input_order_mode)) =
get_window_mode(partitionby_exprs, orderby_keys, input)
{
(should_reverse, input_order_mode)
} else {
return Ok(None);
};
let is_unbounded = input.execution_mode().is_unbounded();
if !is_unbounded && input_order_mode != InputOrderMode::Sorted {
return Ok(None);
};
let window_expr = if should_reverse {
if let Some(reversed_window_expr) = window_exprs
.iter()
.map(|e| e.get_reverse_expr())
.collect::<Option<Vec<_>>>()
{
reversed_window_expr
} else {
return Ok(None);
}
} else {
window_exprs.to_vec()
};
if window_expr.iter().all(|e| e.uses_bounded_memory()) {
Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
window_expr,
Arc::clone(input),
physical_partition_keys.to_vec(),
input_order_mode,
)?) as _))
} else if input_order_mode != InputOrderMode::Sorted {
Ok(None)
} else {
Ok(Some(Arc::new(WindowAggExec::try_new(
window_expr,
Arc::clone(input),
physical_partition_keys.to_vec(),
)?) as _))
}
}
pub fn get_window_mode(
partitionby_exprs: &[Arc<dyn PhysicalExpr>],
orderby_keys: LexOrderingRef,
input: &Arc<dyn ExecutionPlan>,
) -> Option<(bool, InputOrderMode)> {
let input_eqs = input.equivalence_properties().clone();
let mut partition_by_reqs: LexRequirement = LexRequirement::new(vec![]);
let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs);
vec![].extend(indices.iter().map(|&idx| PhysicalSortRequirement {
expr: Arc::clone(&partitionby_exprs[idx]),
options: None,
}));
partition_by_reqs
.inner
.extend(indices.iter().map(|&idx| PhysicalSortRequirement {
expr: Arc::clone(&partitionby_exprs[idx]),
options: None,
}));
let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
let partition_by_eqs = input_eqs.with_constants(const_exprs);
let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys.iter());
let reverse_order_by_reqs =
PhysicalSortRequirement::from_sort_exprs(reverse_order_bys(orderby_keys).iter());
for (should_swap, order_by_reqs) in
[(false, order_by_reqs), (true, reverse_order_by_reqs)]
{
let req = LexRequirement::new(
[partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(),
);
let req = collapse_lex_req(req);
if partition_by_eqs.ordering_satisfy_requirement(&req) {
let mode = if indices.len() == partitionby_exprs.len() {
InputOrderMode::Sorted
} else if indices.is_empty() {
InputOrderMode::Linear
} else {
InputOrderMode::PartiallySorted(indices)
};
return Some((should_swap, mode));
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collect;
use crate::expressions::col;
use crate::streaming::StreamingTableExec;
use crate::test::assert_is_pending;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use arrow::compute::SortOptions;
use datafusion_execution::TaskContext;
use datafusion_functions_aggregate::count::count_udaf;
use futures::FutureExt;
use InputOrderMode::{Linear, PartiallySorted, Sorted};
fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
let schema = Arc::new(Schema::new(vec![nullable_column, non_nullable_column]));
Ok(schema)
}
fn create_test_schema2() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
let b = Field::new("b", DataType::Int32, true);
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, true);
let e = Field::new("e", DataType::Int32, true);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
Ok(schema)
}
fn create_test_schema3() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
let b = Field::new("b", DataType::Int32, false);
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, false);
let e = Field::new("e", DataType::Int32, false);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
Ok(schema)
}
pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
}
pub fn sort_expr_options(
name: &str,
schema: &Schema,
options: SortOptions,
) -> PhysicalSortExpr {
PhysicalSortExpr {
expr: col(name, schema).unwrap(),
options,
}
}
pub fn streaming_table_exec(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
infinite_source: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let sort_exprs = sort_exprs.into_iter().collect();
Ok(Arc::new(StreamingTableExec::try_new(
Arc::clone(schema),
vec![],
None,
Some(sort_exprs),
infinite_source,
None,
)?))
}
#[tokio::test]
async fn test_calc_requirements() -> Result<()> {
let schema = create_test_schema2()?;
let test_data = vec![
(
vec!["a"],
vec![("b", true, true)],
vec![("a", None), ("b", Some((true, true)))],
),
(vec!["a"], vec![("a", true, true)], vec![("a", None)]),
(
vec!["a"],
vec![("b", true, true), ("c", false, false)],
vec![
("a", None),
("b", Some((true, true))),
("c", Some((false, false))),
],
),
(
vec!["a", "c"],
vec![("b", true, true), ("c", false, false)],
vec![("a", None), ("c", None), ("b", Some((true, true)))],
),
];
for (pb_params, ob_params, expected_params) in test_data {
let mut partitionbys = vec![];
for col_name in pb_params {
partitionbys.push(col(col_name, &schema)?);
}
let mut orderbys = vec![];
for (col_name, descending, nulls_first) in ob_params {
let expr = col(col_name, &schema)?;
let options = SortOptions {
descending,
nulls_first,
};
orderbys.push(PhysicalSortExpr { expr, options });
}
let mut expected: Option<LexRequirement> = None;
for (col_name, reqs) in expected_params {
let options = reqs.map(|(descending, nulls_first)| SortOptions {
descending,
nulls_first,
});
let expr = col(col_name, &schema)?;
let res = PhysicalSortRequirement::new(expr, options);
if let Some(expected) = &mut expected {
expected.push(res);
} else {
expected = Some(LexRequirement::new(vec![res]));
}
}
assert_eq!(calc_requirements(partitionbys, orderbys), expected);
}
Ok(())
}
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
let refs = blocking_exec.refs();
let window_agg_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
"count".to_owned(),
&[col("a", &schema)?],
&[],
LexOrderingRef::default(),
Arc::new(WindowFrame::new(None)),
schema.as_ref(),
false,
)?],
blocking_exec,
vec![],
)?);
let fut = collect(window_agg_exec, task_ctx);
let mut fut = fut.boxed();
assert_is_pending(&mut fut);
drop(fut);
assert_strong_count_converges_to_zero(refs).await;
Ok(())
}
#[tokio::test]
async fn test_satisfy_nullable() -> Result<()> {
let schema = create_test_schema()?;
let params = vec![
((true, true), (false, false), false),
((true, true), (false, true), false),
((true, true), (true, false), false),
((true, false), (false, true), false),
((true, false), (false, false), false),
((true, false), (true, true), false),
((true, false), (true, false), true),
];
for (
(physical_desc, physical_nulls_first),
(req_desc, req_nulls_first),
expected,
) in params
{
let physical_ordering = PhysicalSortExpr {
expr: col("nullable_col", &schema)?,
options: SortOptions {
descending: physical_desc,
nulls_first: physical_nulls_first,
},
};
let required_ordering = PhysicalSortExpr {
expr: col("nullable_col", &schema)?,
options: SortOptions {
descending: req_desc,
nulls_first: req_nulls_first,
},
};
let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
assert_eq!(res, expected);
}
Ok(())
}
#[tokio::test]
async fn test_satisfy_non_nullable() -> Result<()> {
let schema = create_test_schema()?;
let params = vec![
((true, true), (false, false), false),
((true, true), (false, true), false),
((true, true), (true, false), true),
((true, false), (false, true), false),
((true, false), (false, false), false),
((true, false), (true, true), true),
((true, false), (true, false), true),
];
for (
(physical_desc, physical_nulls_first),
(req_desc, req_nulls_first),
expected,
) in params
{
let physical_ordering = PhysicalSortExpr {
expr: col("non_nullable_col", &schema)?,
options: SortOptions {
descending: physical_desc,
nulls_first: physical_nulls_first,
},
};
let required_ordering = PhysicalSortExpr {
expr: col("non_nullable_col", &schema)?,
options: SortOptions {
descending: req_desc,
nulls_first: req_nulls_first,
},
};
let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
assert_eq!(res, expected);
}
Ok(())
}
#[tokio::test]
async fn test_get_window_mode_exhaustive() -> Result<()> {
let test_schema = create_test_schema3()?;
let sort_exprs = vec![
sort_expr("a", &test_schema),
sort_expr("b", &test_schema),
sort_expr("c", &test_schema),
sort_expr("d", &test_schema),
];
let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, true)?;
let test_cases = vec![
(vec!["a"], vec!["a"], Some(Sorted)),
(vec!["a"], vec!["b"], Some(Sorted)),
(vec!["a"], vec!["c"], None),
(vec!["a"], vec!["a", "b"], Some(Sorted)),
(vec!["a"], vec!["b", "c"], Some(Sorted)),
(vec!["a"], vec!["a", "c"], None),
(vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
(vec!["b"], vec!["a"], Some(Linear)),
(vec!["b"], vec!["b"], Some(Linear)),
(vec!["b"], vec!["c"], None),
(vec!["b"], vec!["a", "b"], Some(Linear)),
(vec!["b"], vec!["b", "c"], None),
(vec!["b"], vec!["a", "c"], Some(Linear)),
(vec!["b"], vec!["a", "b", "c"], Some(Linear)),
(vec!["c"], vec!["a"], Some(Linear)),
(vec!["c"], vec!["b"], None),
(vec!["c"], vec!["c"], Some(Linear)),
(vec!["c"], vec!["a", "b"], Some(Linear)),
(vec!["c"], vec!["b", "c"], None),
(vec!["c"], vec!["a", "c"], Some(Linear)),
(vec!["c"], vec!["a", "b", "c"], Some(Linear)),
(vec!["b", "a"], vec!["a"], Some(Sorted)),
(vec!["b", "a"], vec!["b"], Some(Sorted)),
(vec!["b", "a"], vec!["c"], Some(Sorted)),
(vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
(vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
(vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
(vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
(vec!["c", "b"], vec!["a"], Some(Linear)),
(vec!["c", "b"], vec!["b"], Some(Linear)),
(vec!["c", "b"], vec!["c"], Some(Linear)),
(vec!["c", "b"], vec!["a", "b"], Some(Linear)),
(vec!["c", "b"], vec!["b", "c"], Some(Linear)),
(vec!["c", "b"], vec!["a", "c"], Some(Linear)),
(vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
(vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
(vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
(vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
(
vec!["c", "a"],
vec!["a", "b"],
Some(PartiallySorted(vec![1])),
),
(
vec!["c", "a"],
vec!["b", "c"],
Some(PartiallySorted(vec![1])),
),
(
vec!["c", "a"],
vec!["a", "c"],
Some(PartiallySorted(vec![1])),
),
(
vec!["c", "a"],
vec!["a", "b", "c"],
Some(PartiallySorted(vec![1])),
),
(vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
];
for (case_idx, test_case) in test_cases.iter().enumerate() {
let (partition_by_columns, order_by_params, expected) = &test_case;
let mut partition_by_exprs = vec![];
for col_name in partition_by_columns {
partition_by_exprs.push(col(col_name, &test_schema)?);
}
let mut order_by_exprs = LexOrdering::default();
for col_name in order_by_params {
let expr = col(col_name, &test_schema)?;
let options = SortOptions::default();
order_by_exprs.push(PhysicalSortExpr { expr, options });
}
let res = get_window_mode(
&partition_by_exprs,
order_by_exprs.as_ref(),
&exec_unbounded,
);
let res = res.map(|(_, mode)| mode);
assert_eq!(
res, *expected,
"Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
);
}
Ok(())
}
#[tokio::test]
async fn test_get_window_mode() -> Result<()> {
let test_schema = create_test_schema3()?;
let sort_exprs = vec![
sort_expr("a", &test_schema),
sort_expr("b", &test_schema),
sort_expr("c", &test_schema),
sort_expr("d", &test_schema),
];
let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, true)?;
let test_cases = vec![
(vec!["a", "b"], vec![("c", false, false)], None),
(vec![], vec![("c", false, true)], None),
(vec!["b"], vec![("c", false, true)], None),
(vec!["a"], vec![("c", false, true)], None),
(
vec!["a", "b"],
vec![("c", false, true), ("e", false, true)],
None,
),
(vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
(vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
(vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
(vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
(vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
(vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
(vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
(
vec!["a", "b"],
vec![("c", false, true)],
Some((false, Sorted)),
),
(
vec!["b", "a"],
vec![("c", false, true)],
Some((false, Sorted)),
),
(
vec!["a", "b"],
vec![("c", true, false)],
Some((true, Sorted)),
),
(
vec!["e"],
vec![("a", false, true)],
Some((false, Linear)),
),
(
vec!["b", "c"],
vec![("a", false, true), ("c", false, true)],
Some((false, Linear)),
),
(vec!["b"], vec![("a", false, true)], Some((false, Linear))),
(
vec!["a", "e"],
vec![("b", false, true)],
Some((false, PartiallySorted(vec![0]))),
),
(
vec!["a", "c"],
vec![("b", false, true)],
Some((false, PartiallySorted(vec![0]))),
),
(
vec!["c", "a"],
vec![("b", false, true)],
Some((false, PartiallySorted(vec![1]))),
),
(
vec!["d", "b", "a"],
vec![("c", false, true)],
Some((false, PartiallySorted(vec![2, 1]))),
),
(
vec!["e", "b", "a"],
vec![("c", false, true)],
Some((false, PartiallySorted(vec![2, 1]))),
),
(
vec!["d", "a"],
vec![("b", false, true)],
Some((false, PartiallySorted(vec![1]))),
),
(
vec!["a"],
vec![("b", false, true), ("a", false, true)],
Some((false, Sorted)),
),
(vec![], vec![("b", false, true), ("a", false, true)], None),
];
for (case_idx, test_case) in test_cases.iter().enumerate() {
let (partition_by_columns, order_by_params, expected) = &test_case;
let mut partition_by_exprs = vec![];
for col_name in partition_by_columns {
partition_by_exprs.push(col(col_name, &test_schema)?);
}
let mut order_by_exprs = LexOrdering::default();
for (col_name, descending, nulls_first) in order_by_params {
let expr = col(col_name, &test_schema)?;
let options = SortOptions {
descending: *descending,
nulls_first: *nulls_first,
};
order_by_exprs.push(PhysicalSortExpr { expr, options });
}
assert_eq!(
get_window_mode(&partition_by_exprs, order_by_exprs.as_ref(), &exec_unbounded),
*expected,
"Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
);
}
Ok(())
}
}