use std::any::Any;
use std::fmt;
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use datafusion_common::{not_impl_err, Result};
use datafusion_expr::function::AccumulatorArgs;
use datafusion_expr::function::StateFieldsArgs;
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
make_udaf_expr_and_func!(
Grouping,
grouping,
expression,
"Returns 1 if the data is aggregated across the specified column or 0 for not aggregated in the result set.",
grouping_udaf
);
pub struct Grouping {
signature: Signature,
}
impl fmt::Debug for Grouping {
fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
f.debug_struct("Grouping")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}
impl Default for Grouping {
fn default() -> Self {
Self::new()
}
}
impl Grouping {
pub fn new() -> Self {
Self {
signature: Signature::any(1, Volatility::Immutable),
}
}
}
impl AggregateUDFImpl for Grouping {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"grouping"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int32)
}
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
Ok(vec![Field::new(
format_state_name(args.name, "grouping"),
DataType::Int32,
true,
)])
}
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
not_impl_err!(
"physical plan is not yet implemented for GROUPING aggregate function"
)
}
}