datafusion_functions_aggregate_common/aggregate/count_distinct/
bytes.rsuse arrow::array::{ArrayRef, OffsetSizeTrait};
use datafusion_common::cast::as_list_array;
use datafusion_common::utils::array_into_list_array_nullable;
use datafusion_common::ScalarValue;
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType};
use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewSet;
use std::fmt::Debug;
use std::mem::size_of_val;
use std::sync::Arc;
#[derive(Debug)]
pub struct BytesDistinctCountAccumulator<O: OffsetSizeTrait>(ArrowBytesSet<O>);
impl<O: OffsetSizeTrait> BytesDistinctCountAccumulator<O> {
pub fn new(output_type: OutputType) -> Self {
Self(ArrowBytesSet::new(output_type))
}
}
impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let set = self.0.take();
let arr = set.into_state();
let list = Arc::new(array_into_list_array_nullable(arr));
Ok(vec![ScalarValue::List(list)])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}
self.0.insert(&values[0]);
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}
assert_eq!(
states.len(),
1,
"count_distinct states must be single array"
);
let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
self.0.insert(&list);
};
Ok(())
})
}
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
}
fn size(&self) -> usize {
size_of_val(self) + self.0.size()
}
}
#[derive(Debug)]
pub struct BytesViewDistinctCountAccumulator(ArrowBytesViewSet);
impl BytesViewDistinctCountAccumulator {
pub fn new(output_type: OutputType) -> Self {
Self(ArrowBytesViewSet::new(output_type))
}
}
impl Accumulator for BytesViewDistinctCountAccumulator {
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let set = self.0.take();
let arr = set.into_state();
let list = Arc::new(array_into_list_array_nullable(arr));
Ok(vec![ScalarValue::List(list)])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}
self.0.insert(&values[0]);
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}
assert_eq!(
states.len(),
1,
"count_distinct states must be single array"
);
let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
self.0.insert(&list);
};
Ok(())
})
}
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
}
fn size(&self) -> usize {
size_of_val(self) + self.0.size()
}
}