use std::any::Any;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::expressions::format_state_name;
use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
};
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::{new_empty_array, Array, ArrayRef, StructArray};
use arrow_schema::Fields;
use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
use datafusion_common::{exec_err, Result, ScalarValue};
use datafusion_expr::utils::AggregateOrderSensitivity;
use datafusion_expr::Accumulator;
use datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays;
#[derive(Debug)]
pub struct OrderSensitiveArrayAgg {
name: String,
input_data_type: DataType,
expr: Arc<dyn PhysicalExpr>,
nullable: bool,
order_by_data_types: Vec<DataType>,
ordering_req: LexOrdering,
reverse: bool,
}
impl OrderSensitiveArrayAgg {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
input_data_type: DataType,
nullable: bool,
order_by_data_types: Vec<DataType>,
ordering_req: LexOrdering,
) -> Self {
Self {
name: name.into(),
input_data_type,
expr,
nullable,
order_by_data_types,
ordering_req,
reverse: false,
}
}
}
impl AggregateExpr for OrderSensitiveArrayAgg {
fn as_any(&self) -> &dyn Any {
self
}
fn field(&self) -> Result<Field> {
Ok(Field::new_list(
&self.name,
Field::new("item", self.input_data_type.clone(), self.nullable),
false,
))
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
OrderSensitiveArrayAggAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
self.reverse,
self.nullable,
)
.map(|acc| Box::new(acc) as _)
}
fn state_fields(&self) -> Result<Vec<Field>> {
let mut fields = vec![Field::new_list(
format_state_name(&self.name, "array_agg"),
Field::new("item", self.input_data_type.clone(), self.nullable),
false, )];
let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
fields.push(Field::new_list(
format_state_name(&self.name, "array_agg_orderings"),
Field::new(
"item",
DataType::Struct(Fields::from(orderings)),
self.nullable,
),
false,
));
Ok(fields)
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![Arc::clone(&self.expr)]
}
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
(!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
fn order_sensitivity(&self) -> AggregateOrderSensitivity {
AggregateOrderSensitivity::HardRequirement
}
fn name(&self) -> &str {
&self.name
}
fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
Some(Arc::new(Self {
name: self.name.to_string(),
input_data_type: self.input_data_type.clone(),
expr: Arc::clone(&self.expr),
nullable: self.nullable,
order_by_data_types: self.order_by_data_types.clone(),
ordering_req: reverse_order_bys(&self.ordering_req),
reverse: !self.reverse,
}))
}
}
impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.name == x.name
&& self.input_data_type == x.input_data_type
&& self.order_by_data_types == x.order_by_data_types
&& self.expr.eq(&x.expr)
})
.unwrap_or(false)
}
}
#[derive(Debug)]
pub(crate) struct OrderSensitiveArrayAggAccumulator {
values: Vec<ScalarValue>,
ordering_values: Vec<Vec<ScalarValue>>,
datatypes: Vec<DataType>,
ordering_req: LexOrdering,
reverse: bool,
nullable: bool,
}
impl OrderSensitiveArrayAggAccumulator {
pub fn try_new(
datatype: &DataType,
ordering_dtypes: &[DataType],
ordering_req: LexOrdering,
reverse: bool,
nullable: bool,
) -> Result<Self> {
let mut datatypes = vec![datatype.clone()];
datatypes.extend(ordering_dtypes.iter().cloned());
Ok(Self {
values: vec![],
ordering_values: vec![],
datatypes,
ordering_req,
reverse,
nullable,
})
}
}
impl Accumulator for OrderSensitiveArrayAggAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let n_row = values[0].len();
for index in 0..n_row {
let row = get_row_at_idx(values, index)?;
self.values.push(row[0].clone());
self.ordering_values.push(row[1..].to_vec());
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
let [array_agg_values, agg_orderings, ..] = &states else {
return exec_err!("State should have two elements");
};
let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() else {
return exec_err!("Expects to receive a list array");
};
let mut partition_values = vec![];
let mut partition_ordering_values = vec![];
partition_values.push(self.values.clone().into());
partition_ordering_values.push(self.ordering_values.clone().into());
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
for v in array_agg_res.into_iter() {
partition_values.push(v.into());
}
let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
for partition_ordering_rows in orderings.into_iter() {
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(s) = ordering_row {
let mut ordering_columns_per_row = vec![];
for column in s.columns() {
let sv = ScalarValue::try_from_array(column, 0)?;
ordering_columns_per_row.push(sv);
}
Ok(ordering_columns_per_row)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}",
ordering_row.data_type()
)
}
}).collect::<Result<VecDeque<_>>>()?;
partition_ordering_values.push(ordering_value);
}
let sort_options = self
.ordering_req
.iter()
.map(|sort_expr| sort_expr.options)
.collect::<Vec<_>>();
(self.values, self.ordering_values) = merge_ordered_arrays(
&mut partition_values,
&mut partition_ordering_values,
&sort_options,
)?;
Ok(())
}
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.evaluate()?];
result.push(self.evaluate_orderings()?);
Ok(result)
}
fn evaluate(&mut self) -> Result<ScalarValue> {
let values = self.values.clone();
let array = if self.reverse {
ScalarValue::new_list_from_iter(
values.into_iter().rev(),
&self.datatypes[0],
self.nullable,
)
} else {
ScalarValue::new_list_from_iter(
values.into_iter(),
&self.datatypes[0],
self.nullable,
)
};
Ok(ScalarValue::List(array))
}
fn size(&self) -> usize {
let mut total = std::mem::size_of_val(self)
+ ScalarValue::size_of_vec(&self.values)
- std::mem::size_of_val(&self.values);
total +=
std::mem::size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
for row in &self.ordering_values {
total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row);
}
total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
for dtype in &self.datatypes {
total += dtype.size() - std::mem::size_of_val(dtype);
}
total += std::mem::size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
total
}
}
impl OrderSensitiveArrayAggAccumulator {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let num_columns = fields.len();
let struct_field = Fields::from(fields.clone());
let mut column_wise_ordering_values = vec![];
for i in 0..num_columns {
let column_values = self
.ordering_values
.iter()
.map(|x| x[i].clone())
.collect::<Vec<_>>();
let array = if column_values.is_empty() {
new_empty_array(fields[i].data_type())
} else {
ScalarValue::iter_to_array(column_values.into_iter())?
};
column_wise_ordering_values.push(array);
}
let ordering_array = StructArray::try_new(
struct_field.clone(),
column_wise_ordering_values,
None,
)?;
Ok(ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(ordering_array),
self.nullable,
))))
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::sync::Arc;
use crate::aggregate::array_agg_ordered::merge_ordered_arrays;
use arrow_array::{Array, ArrayRef, Int64Array};
use arrow_schema::SortOptions;
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::{Result, ScalarValue};
#[test]
fn test_merge_asc() -> Result<()> {
let lhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
];
let n_row = lhs_arrays[0].len();
let lhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let rhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
];
let n_row = rhs_arrays[0].len();
let rhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let sort_options = vec![
SortOptions {
descending: false,
nulls_first: false,
},
SortOptions {
descending: false,
nulls_first: false,
},
];
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
let lhs_vals = (0..lhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
let rhs_vals = (0..rhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let expected =
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
let expected_ts = vec![
Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
];
let (merged_vals, merged_ts) = merge_ordered_arrays(
&mut [lhs_vals, rhs_vals],
&mut [lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
(0..merged_ts.len())
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
)
})
.collect::<Result<Vec<_>>>()?;
assert_eq!(&merged_vals, &expected);
assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
#[test]
fn test_merge_desc() -> Result<()> {
let lhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
];
let n_row = lhs_arrays[0].len();
let lhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let rhs_arrays: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
];
let n_row = rhs_arrays[0].len();
let rhs_orderings = (0..n_row)
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
.collect::<Result<VecDeque<_>>>()?;
let sort_options = vec![
SortOptions {
descending: true,
nulls_first: false,
},
SortOptions {
descending: true,
nulls_first: false,
},
];
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
let lhs_vals = (0..lhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
let rhs_vals = (0..rhs_vals_arr.len())
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
.collect::<Result<VecDeque<_>>>()?;
let expected =
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
let expected_ts = vec![
Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
];
let (merged_vals, merged_ts) = merge_ordered_arrays(
&mut [lhs_vals, rhs_vals],
&mut [lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
(0..merged_ts.len())
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
)
})
.collect::<Result<Vec<_>>>()?;
assert_eq!(&merged_vals, &expected);
assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
}