use std::borrow::Borrow;
use std::sync::Arc;
use crate::{
expressions::{
cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, NthValue, Ntile,
PhysicalSortExpr, RowNumber,
},
ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr,
};
use arrow::datatypes::Schema;
use arrow_schema::{DataType, Field, SchemaRef};
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{col, Expr, SortExpr};
use datafusion_expr::{
BuiltInWindowFunction, PartitionEvaluator, WindowFrame, WindowFunctionDefinition,
WindowUDF,
};
use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
reverse_order_bys,
window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr},
AggregateExpr, ConstExpr, EquivalenceProperties, LexOrdering,
PhysicalSortRequirement,
};
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
use itertools::Itertools;
mod bounded_window_agg_exec;
mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
pub use window_agg_exec::WindowAggExec;
pub fn schema_add_window_field(
args: &[Arc<dyn PhysicalExpr>],
schema: &Schema,
window_fn: &WindowFunctionDefinition,
fn_name: &str,
) -> Result<Arc<Schema>> {
let data_types = args
.iter()
.map(|e| Arc::clone(e).as_ref().data_type(schema))
.collect::<Result<Vec<_>>>()?;
let nullability = args
.iter()
.map(|e| Arc::clone(e).as_ref().nullable(schema))
.collect::<Result<Vec<_>>>()?;
let window_expr_return_type = window_fn.return_type(&data_types, &nullability)?;
let mut window_fields = schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect_vec();
if let WindowFunctionDefinition::AggregateUDF(_) = window_fn {
Ok(Arc::new(Schema::new(window_fields)))
} else {
window_fields.extend_from_slice(&[Field::new(
fn_name,
window_expr_return_type,
false,
)]);
Ok(Arc::new(Schema::new(window_fields)))
}
}
#[allow(clippy::too_many_arguments)]
pub fn create_window_expr(
fun: &WindowFunctionDefinition,
name: String,
args: &[Arc<dyn PhysicalExpr>],
_logical_args: &[Expr],
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
input_schema: &Schema,
ignore_nulls: bool,
) -> Result<Arc<dyn WindowExpr>> {
Ok(match fun {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => {
Arc::new(BuiltInWindowExpr::new(
create_built_in_window_expr(fun, args, input_schema, name, ignore_nulls)?,
partition_by,
order_by,
window_frame,
))
}
WindowFunctionDefinition::AggregateUDF(fun) => {
let sort_exprs = order_by
.iter()
.map(|PhysicalSortExpr { expr, options }| {
let field_name = expr.to_string();
let field_name = field_name.split('@').next().unwrap_or(&field_name);
Expr::Sort(SortExpr {
expr: Box::new(col(field_name)),
asc: !options.descending,
nulls_first: options.nulls_first,
})
})
.collect::<Vec<_>>();
let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
.schema(Arc::new(input_schema.clone()))
.alias(name)
.order_by(order_by.to_vec())
.sort_exprs(sort_exprs)
.with_ignore_nulls(ignore_nulls)
.build()?;
window_expr_from_aggregate_expr(
partition_by,
order_by,
window_frame,
aggregate,
)
}
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new(
create_udwf_window_expr(fun, args, input_schema, name)?,
partition_by,
order_by,
window_frame,
)),
})
}
fn window_expr_from_aggregate_expr(
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
aggregate: Arc<dyn AggregateExpr>,
) -> Arc<dyn WindowExpr> {
let unbounded_window = window_frame.start_bound.is_unbounded();
if !unbounded_window {
Arc::new(SlidingAggregateWindowExpr::new(
aggregate,
partition_by,
order_by,
window_frame,
))
} else {
Arc::new(PlainAggregateWindowExpr::new(
aggregate,
partition_by,
order_by,
window_frame,
))
}
}
fn get_scalar_value_from_args(
args: &[Arc<dyn PhysicalExpr>],
index: usize,
) -> Result<Option<ScalarValue>> {
Ok(if let Some(field) = args.get(index) {
let tmp = field
.as_any()
.downcast_ref::<Literal>()
.ok_or_else(|| DataFusionError::NotImplemented(
format!("There is only support Literal types for field at idx: {index} in Window Function"),
))?
.value()
.clone();
Some(tmp)
} else {
None
})
}
fn get_signed_integer(value: ScalarValue) -> Result<i64> {
if !value.data_type().is_integer() {
return Err(DataFusionError::Execution(
"Expected an integer value".to_string(),
));
}
value.cast_to(&DataType::Int64)?.try_into()
}
fn get_unsigned_integer(value: ScalarValue) -> Result<u64> {
if !value.data_type().is_integer() {
return Err(DataFusionError::Execution(
"Expected an integer value".to_string(),
));
}
value.cast_to(&DataType::UInt64)?.try_into()
}
fn get_casted_value(
default_value: Option<ScalarValue>,
dtype: &DataType,
) -> Result<ScalarValue> {
match default_value {
Some(v) if !v.data_type().is_null() => v.cast_to(dtype),
_ => ScalarValue::try_from(dtype),
}
}
fn create_built_in_window_expr(
fun: &BuiltInWindowFunction,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
let out_data_type: &DataType = input_schema.field_with_name(&name)?.data_type();
Ok(match fun {
BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name, out_data_type)),
BuiltInWindowFunction::Rank => Arc::new(rank(name, out_data_type)),
BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name, out_data_type)),
BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name, out_data_type)),
BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name, out_data_type)),
BuiltInWindowFunction::Ntile => {
let n = get_scalar_value_from_args(args, 0)?.ok_or_else(|| {
DataFusionError::Execution(
"NTILE requires a positive integer".to_string(),
)
})?;
if n.is_null() {
return exec_err!("NTILE requires a positive integer, but finds NULL");
}
if n.is_unsigned() {
let n = get_unsigned_integer(n)?;
Arc::new(Ntile::new(name, n, out_data_type))
} else {
let n: i64 = get_signed_integer(n)?;
if n <= 0 {
return exec_err!("NTILE requires a positive integer");
}
Arc::new(Ntile::new(name, n as u64, out_data_type))
}
}
BuiltInWindowFunction::Lag => {
let arg = Arc::clone(&args[0]);
let shift_offset = get_scalar_value_from_args(args, 1)?
.map(get_signed_integer)
.map_or(Ok(None), |v| v.map(Some))?;
let default_value =
get_casted_value(get_scalar_value_from_args(args, 2)?, out_data_type)?;
Arc::new(lag(
name,
out_data_type.clone(),
arg,
shift_offset,
default_value,
ignore_nulls,
))
}
BuiltInWindowFunction::Lead => {
let arg = Arc::clone(&args[0]);
let shift_offset = get_scalar_value_from_args(args, 1)?
.map(get_signed_integer)
.map_or(Ok(None), |v| v.map(Some))?;
let default_value =
get_casted_value(get_scalar_value_from_args(args, 2)?, out_data_type)?;
Arc::new(lead(
name,
out_data_type.clone(),
arg,
shift_offset,
default_value,
ignore_nulls,
))
}
BuiltInWindowFunction::NthValue => {
let arg = Arc::clone(&args[0]);
let n = get_signed_integer(
args[1]
.as_any()
.downcast_ref::<Literal>()
.unwrap()
.value()
.clone(),
)?;
Arc::new(NthValue::nth(
name,
arg,
out_data_type.clone(),
n,
ignore_nulls,
)?)
}
BuiltInWindowFunction::FirstValue => {
let arg = Arc::clone(&args[0]);
Arc::new(NthValue::first(
name,
arg,
out_data_type.clone(),
ignore_nulls,
))
}
BuiltInWindowFunction::LastValue => {
let arg = Arc::clone(&args[0]);
Arc::new(NthValue::last(
name,
arg,
out_data_type.clone(),
ignore_nulls,
))
}
})
}
fn create_udwf_window_expr(
fun: &Arc<WindowUDF>,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
let input_types: Vec<_> = args
.iter()
.map(|arg| arg.data_type(input_schema))
.collect::<Result<_>>()?;
let data_type = fun.return_type(&input_types)?;
Ok(Arc::new(WindowUDFExpr {
fun: Arc::clone(fun),
args: args.to_vec(),
name,
data_type,
}))
}
#[derive(Clone, Debug)]
struct WindowUDFExpr {
fun: Arc<WindowUDF>,
args: Vec<Arc<dyn PhysicalExpr>>,
name: String,
data_type: DataType,
}
impl BuiltInWindowFunctionExpr for WindowUDFExpr {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn field(&self) -> Result<Field> {
let nullable = true;
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.args.clone()
}
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.fun.partition_evaluator_factory()
}
fn name(&self) -> &str {
&self.name
}
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
None
}
}
pub(crate) fn calc_requirements<
T: Borrow<Arc<dyn PhysicalExpr>>,
S: Borrow<PhysicalSortExpr>,
>(
partition_by_exprs: impl IntoIterator<Item = T>,
orderby_sort_exprs: impl IntoIterator<Item = S>,
) -> Option<Vec<PhysicalSortRequirement>> {
let mut sort_reqs = partition_by_exprs
.into_iter()
.map(|partition_by| {
PhysicalSortRequirement::new(Arc::clone(partition_by.borrow()), None)
})
.collect::<Vec<_>>();
for element in orderby_sort_exprs.into_iter() {
let PhysicalSortExpr { expr, options } = element.borrow();
if !sort_reqs.iter().any(|e| e.expr.eq(expr)) {
sort_reqs.push(PhysicalSortRequirement::new(
Arc::clone(expr),
Some(*options),
));
}
}
(!sort_reqs.is_empty()).then_some(sort_reqs)
}
pub fn get_ordered_partition_by_indices(
partition_by_exprs: &[Arc<dyn PhysicalExpr>],
input: &Arc<dyn ExecutionPlan>,
) -> Vec<usize> {
let (_, indices) = input
.equivalence_properties()
.find_longest_permutation(partition_by_exprs);
indices
}
pub(crate) fn get_partition_by_sort_exprs(
input: &Arc<dyn ExecutionPlan>,
partition_by_exprs: &[Arc<dyn PhysicalExpr>],
ordered_partition_by_indices: &[usize],
) -> Result<LexOrdering> {
let ordered_partition_exprs = ordered_partition_by_indices
.iter()
.map(|idx| Arc::clone(&partition_by_exprs[*idx]))
.collect::<Vec<_>>();
assert!(ordered_partition_by_indices.len() <= partition_by_exprs.len());
let (ordering, _) = input
.equivalence_properties()
.find_longest_permutation(&ordered_partition_exprs);
if ordering.len() == ordered_partition_exprs.len() {
Ok(ordering)
} else {
exec_err!("Expects PARTITION BY expression to be ordered")
}
}
pub(crate) fn window_equivalence_properties(
schema: &SchemaRef,
input: &Arc<dyn ExecutionPlan>,
window_expr: &[Arc<dyn WindowExpr>],
) -> EquivalenceProperties {
let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
.extend(input.equivalence_properties().clone());
for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
{
builtin_window_expr.add_equal_orderings(&mut window_eq_properties);
}
}
window_eq_properties
}
pub fn get_best_fitting_window(
window_exprs: &[Arc<dyn WindowExpr>],
input: &Arc<dyn ExecutionPlan>,
physical_partition_keys: &[Arc<dyn PhysicalExpr>],
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let partitionby_exprs = window_exprs[0].partition_by();
let orderby_keys = window_exprs[0].order_by();
let (should_reverse, input_order_mode) =
if let Some((should_reverse, input_order_mode)) =
get_window_mode(partitionby_exprs, orderby_keys, input)
{
(should_reverse, input_order_mode)
} else {
return Ok(None);
};
let is_unbounded = input.execution_mode().is_unbounded();
if !is_unbounded && input_order_mode != InputOrderMode::Sorted {
return Ok(None);
};
let window_expr = if should_reverse {
if let Some(reversed_window_expr) = window_exprs
.iter()
.map(|e| e.get_reverse_expr())
.collect::<Option<Vec<_>>>()
{
reversed_window_expr
} else {
return Ok(None);
}
} else {
window_exprs.to_vec()
};
if window_expr.iter().all(|e| e.uses_bounded_memory()) {
Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
window_expr,
Arc::clone(input),
physical_partition_keys.to_vec(),
input_order_mode,
)?) as _))
} else if input_order_mode != InputOrderMode::Sorted {
Ok(None)
} else {
Ok(Some(Arc::new(WindowAggExec::try_new(
window_expr,
Arc::clone(input),
physical_partition_keys.to_vec(),
)?) as _))
}
}
pub fn get_window_mode(
partitionby_exprs: &[Arc<dyn PhysicalExpr>],
orderby_keys: &[PhysicalSortExpr],
input: &Arc<dyn ExecutionPlan>,
) -> Option<(bool, InputOrderMode)> {
let input_eqs = input.equivalence_properties().clone();
let mut partition_by_reqs: Vec<PhysicalSortRequirement> = vec![];
let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs);
partition_by_reqs.extend(indices.iter().map(|&idx| PhysicalSortRequirement {
expr: Arc::clone(&partitionby_exprs[idx]),
options: None,
}));
let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
let partition_by_eqs = input_eqs.add_constants(const_exprs);
let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys);
let reverse_order_by_reqs =
PhysicalSortRequirement::from_sort_exprs(&reverse_order_bys(orderby_keys));
for (should_swap, order_by_reqs) in
[(false, order_by_reqs), (true, reverse_order_by_reqs)]
{
let req = [partition_by_reqs.clone(), order_by_reqs].concat();
let req = collapse_lex_req(req);
if partition_by_eqs.ordering_satisfy_requirement(&req) {
let mode = if indices.len() == partitionby_exprs.len() {
InputOrderMode::Sorted
} else if indices.is_empty() {
InputOrderMode::Linear
} else {
InputOrderMode::PartiallySorted(indices)
};
return Some((should_swap, mode));
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collect;
use crate::expressions::col;
use crate::streaming::StreamingTableExec;
use crate::test::assert_is_pending;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use arrow::compute::SortOptions;
use datafusion_execution::TaskContext;
use datafusion_functions_aggregate::count::count_udaf;
use futures::FutureExt;
use InputOrderMode::{Linear, PartiallySorted, Sorted};
fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
let schema = Arc::new(Schema::new(vec![nullable_column, non_nullable_column]));
Ok(schema)
}
fn create_test_schema2() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
let b = Field::new("b", DataType::Int32, true);
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, true);
let e = Field::new("e", DataType::Int32, true);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
Ok(schema)
}
fn create_test_schema3() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
let b = Field::new("b", DataType::Int32, false);
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, false);
let e = Field::new("e", DataType::Int32, false);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
Ok(schema)
}
pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
}
pub fn sort_expr_options(
name: &str,
schema: &Schema,
options: SortOptions,
) -> PhysicalSortExpr {
PhysicalSortExpr {
expr: col(name, schema).unwrap(),
options,
}
}
pub fn streaming_table_exec(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
infinite_source: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let sort_exprs = sort_exprs.into_iter().collect();
Ok(Arc::new(StreamingTableExec::try_new(
Arc::clone(schema),
vec![],
None,
Some(sort_exprs),
infinite_source,
None,
)?))
}
#[tokio::test]
async fn test_calc_requirements() -> Result<()> {
let schema = create_test_schema2()?;
let test_data = vec![
(
vec!["a"],
vec![("b", true, true)],
vec![("a", None), ("b", Some((true, true)))],
),
(vec!["a"], vec![("a", true, true)], vec![("a", None)]),
(
vec!["a"],
vec![("b", true, true), ("c", false, false)],
vec![
("a", None),
("b", Some((true, true))),
("c", Some((false, false))),
],
),
(
vec!["a", "c"],
vec![("b", true, true), ("c", false, false)],
vec![("a", None), ("c", None), ("b", Some((true, true)))],
),
];
for (pb_params, ob_params, expected_params) in test_data {
let mut partitionbys = vec![];
for col_name in pb_params {
partitionbys.push(col(col_name, &schema)?);
}
let mut orderbys = vec![];
for (col_name, descending, nulls_first) in ob_params {
let expr = col(col_name, &schema)?;
let options = SortOptions {
descending,
nulls_first,
};
orderbys.push(PhysicalSortExpr { expr, options });
}
let mut expected: Option<Vec<PhysicalSortRequirement>> = None;
for (col_name, reqs) in expected_params {
let options = reqs.map(|(descending, nulls_first)| SortOptions {
descending,
nulls_first,
});
let expr = col(col_name, &schema)?;
let res = PhysicalSortRequirement::new(expr, options);
if let Some(expected) = &mut expected {
expected.push(res);
} else {
expected = Some(vec![res]);
}
}
assert_eq!(calc_requirements(partitionbys, orderbys), expected);
}
Ok(())
}
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
let refs = blocking_exec.refs();
let window_agg_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
"count".to_owned(),
&[col("a", &schema)?],
&[],
&[],
&[],
Arc::new(WindowFrame::new(None)),
schema.as_ref(),
false,
)?],
blocking_exec,
vec![],
)?);
let fut = collect(window_agg_exec, task_ctx);
let mut fut = fut.boxed();
assert_is_pending(&mut fut);
drop(fut);
assert_strong_count_converges_to_zero(refs).await;
Ok(())
}
#[tokio::test]
async fn test_satisfy_nullable() -> Result<()> {
let schema = create_test_schema()?;
let params = vec![
((true, true), (false, false), false),
((true, true), (false, true), false),
((true, true), (true, false), false),
((true, false), (false, true), false),
((true, false), (false, false), false),
((true, false), (true, true), false),
((true, false), (true, false), true),
];
for (
(physical_desc, physical_nulls_first),
(req_desc, req_nulls_first),
expected,
) in params
{
let physical_ordering = PhysicalSortExpr {
expr: col("nullable_col", &schema)?,
options: SortOptions {
descending: physical_desc,
nulls_first: physical_nulls_first,
},
};
let required_ordering = PhysicalSortExpr {
expr: col("nullable_col", &schema)?,
options: SortOptions {
descending: req_desc,
nulls_first: req_nulls_first,
},
};
let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
assert_eq!(res, expected);
}
Ok(())
}
#[tokio::test]
async fn test_satisfy_non_nullable() -> Result<()> {
let schema = create_test_schema()?;
let params = vec![
((true, true), (false, false), false),
((true, true), (false, true), false),
((true, true), (true, false), true),
((true, false), (false, true), false),
((true, false), (false, false), false),
((true, false), (true, true), true),
((true, false), (true, false), true),
];
for (
(physical_desc, physical_nulls_first),
(req_desc, req_nulls_first),
expected,
) in params
{
let physical_ordering = PhysicalSortExpr {
expr: col("non_nullable_col", &schema)?,
options: SortOptions {
descending: physical_desc,
nulls_first: physical_nulls_first,
},
};
let required_ordering = PhysicalSortExpr {
expr: col("non_nullable_col", &schema)?,
options: SortOptions {
descending: req_desc,
nulls_first: req_nulls_first,
},
};
let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
assert_eq!(res, expected);
}
Ok(())
}
#[tokio::test]
async fn test_get_window_mode_exhaustive() -> Result<()> {
let test_schema = create_test_schema3()?;
let sort_exprs = vec![
sort_expr("a", &test_schema),
sort_expr("b", &test_schema),
sort_expr("c", &test_schema),
sort_expr("d", &test_schema),
];
let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, true)?;
let test_cases = vec![
(vec!["a"], vec!["a"], Some(Sorted)),
(vec!["a"], vec!["b"], Some(Sorted)),
(vec!["a"], vec!["c"], None),
(vec!["a"], vec!["a", "b"], Some(Sorted)),
(vec!["a"], vec!["b", "c"], Some(Sorted)),
(vec!["a"], vec!["a", "c"], None),
(vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
(vec!["b"], vec!["a"], Some(Linear)),
(vec!["b"], vec!["b"], Some(Linear)),
(vec!["b"], vec!["c"], None),
(vec!["b"], vec!["a", "b"], Some(Linear)),
(vec!["b"], vec!["b", "c"], None),
(vec!["b"], vec!["a", "c"], Some(Linear)),
(vec!["b"], vec!["a", "b", "c"], Some(Linear)),
(vec!["c"], vec!["a"], Some(Linear)),
(vec!["c"], vec!["b"], None),
(vec!["c"], vec!["c"], Some(Linear)),
(vec!["c"], vec!["a", "b"], Some(Linear)),
(vec!["c"], vec!["b", "c"], None),
(vec!["c"], vec!["a", "c"], Some(Linear)),
(vec!["c"], vec!["a", "b", "c"], Some(Linear)),
(vec!["b", "a"], vec!["a"], Some(Sorted)),
(vec!["b", "a"], vec!["b"], Some(Sorted)),
(vec!["b", "a"], vec!["c"], Some(Sorted)),
(vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
(vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
(vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
(vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
(vec!["c", "b"], vec!["a"], Some(Linear)),
(vec!["c", "b"], vec!["b"], Some(Linear)),
(vec!["c", "b"], vec!["c"], Some(Linear)),
(vec!["c", "b"], vec!["a", "b"], Some(Linear)),
(vec!["c", "b"], vec!["b", "c"], Some(Linear)),
(vec!["c", "b"], vec!["a", "c"], Some(Linear)),
(vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
(vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
(vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
(vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
(
vec!["c", "a"],
vec!["a", "b"],
Some(PartiallySorted(vec![1])),
),
(
vec!["c", "a"],
vec!["b", "c"],
Some(PartiallySorted(vec![1])),
),
(
vec!["c", "a"],
vec!["a", "c"],
Some(PartiallySorted(vec![1])),
),
(
vec!["c", "a"],
vec!["a", "b", "c"],
Some(PartiallySorted(vec![1])),
),
(vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
(vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
];
for (case_idx, test_case) in test_cases.iter().enumerate() {
let (partition_by_columns, order_by_params, expected) = &test_case;
let mut partition_by_exprs = vec![];
for col_name in partition_by_columns {
partition_by_exprs.push(col(col_name, &test_schema)?);
}
let mut order_by_exprs = vec![];
for col_name in order_by_params {
let expr = col(col_name, &test_schema)?;
let options = SortOptions::default();
order_by_exprs.push(PhysicalSortExpr { expr, options });
}
let res =
get_window_mode(&partition_by_exprs, &order_by_exprs, &exec_unbounded);
let res = res.map(|(_, mode)| mode);
assert_eq!(
res, *expected,
"Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
);
}
Ok(())
}
#[tokio::test]
async fn test_get_window_mode() -> Result<()> {
let test_schema = create_test_schema3()?;
let sort_exprs = vec![
sort_expr("a", &test_schema),
sort_expr("b", &test_schema),
sort_expr("c", &test_schema),
sort_expr("d", &test_schema),
];
let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, true)?;
let test_cases = vec![
(vec!["a", "b"], vec![("c", false, false)], None),
(vec![], vec![("c", false, true)], None),
(vec!["b"], vec![("c", false, true)], None),
(vec!["a"], vec![("c", false, true)], None),
(
vec!["a", "b"],
vec![("c", false, true), ("e", false, true)],
None,
),
(vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
(vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
(vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
(vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
(vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
(vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
(vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
(
vec!["a", "b"],
vec![("c", false, true)],
Some((false, Sorted)),
),
(
vec!["b", "a"],
vec![("c", false, true)],
Some((false, Sorted)),
),
(
vec!["a", "b"],
vec![("c", true, false)],
Some((true, Sorted)),
),
(
vec!["e"],
vec![("a", false, true)],
Some((false, Linear)),
),
(
vec!["b", "c"],
vec![("a", false, true), ("c", false, true)],
Some((false, Linear)),
),
(vec!["b"], vec![("a", false, true)], Some((false, Linear))),
(
vec!["a", "e"],
vec![("b", false, true)],
Some((false, PartiallySorted(vec![0]))),
),
(
vec!["a", "c"],
vec![("b", false, true)],
Some((false, PartiallySorted(vec![0]))),
),
(
vec!["c", "a"],
vec![("b", false, true)],
Some((false, PartiallySorted(vec![1]))),
),
(
vec!["d", "b", "a"],
vec![("c", false, true)],
Some((false, PartiallySorted(vec![2, 1]))),
),
(
vec!["e", "b", "a"],
vec![("c", false, true)],
Some((false, PartiallySorted(vec![2, 1]))),
),
(
vec!["d", "a"],
vec![("b", false, true)],
Some((false, PartiallySorted(vec![1]))),
),
(
vec!["a"],
vec![("b", false, true), ("a", false, true)],
Some((false, Sorted)),
),
(vec![], vec![("b", false, true), ("a", false, true)], None),
];
for (case_idx, test_case) in test_cases.iter().enumerate() {
let (partition_by_columns, order_by_params, expected) = &test_case;
let mut partition_by_exprs = vec![];
for col_name in partition_by_columns {
partition_by_exprs.push(col(col_name, &test_schema)?);
}
let mut order_by_exprs = vec![];
for (col_name, descending, nulls_first) in order_by_params {
let expr = col(col_name, &test_schema)?;
let options = SortOptions {
descending: *descending,
nulls_first: *nulls_first,
};
order_by_exprs.push(PhysicalSortExpr { expr, options });
}
assert_eq!(
get_window_mode(&partition_by_exprs, &order_by_exprs, &exec_unbounded),
*expected,
"Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
);
}
Ok(())
}
}