use std::any::Any;
use arrow::datatypes::{
DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
};
use datafusion_common::{exec_err, not_impl_err, Result};
use crate::type_coercion::aggregates::{avg_return_type, coerce_avg_type, NUMERICS};
use crate::Volatility::Immutable;
use crate::{
expr::AggregateFunction,
function::{AccumulatorArgs, StateFieldsArgs},
utils::AggregateOrderSensitivity,
Accumulator, AggregateUDFImpl, Expr, GroupsAccumulator, ReversedUDAF, Signature,
Volatility,
};
macro_rules! create_func {
($UDAF:ty, $AGGREGATE_UDF_FN:ident) => {
paste::paste! {
#[allow(non_upper_case_globals)]
static [< STATIC_ $UDAF >]: std::sync::OnceLock<std::sync::Arc<crate::AggregateUDF>> =
std::sync::OnceLock::new();
#[doc = concat!("AggregateFunction that returns a [AggregateUDF](crate::AggregateUDF) for [`", stringify!($UDAF), "`]")]
pub fn $AGGREGATE_UDF_FN() -> std::sync::Arc<crate::AggregateUDF> {
[< STATIC_ $UDAF >]
.get_or_init(|| {
std::sync::Arc::new(crate::AggregateUDF::from(<$UDAF>::default()))
})
.clone()
}
}
}
}
create_func!(Sum, sum_udaf);
pub fn sum(expr: Expr) -> Expr {
Expr::AggregateFunction(AggregateFunction::new_udf(
sum_udaf(),
vec![expr],
false,
None,
None,
None,
))
}
create_func!(Count, count_udaf);
pub fn count(expr: Expr) -> Expr {
Expr::AggregateFunction(AggregateFunction::new_udf(
count_udaf(),
vec![expr],
false,
None,
None,
None,
))
}
create_func!(Avg, avg_udaf);
pub fn avg(expr: Expr) -> Expr {
Expr::AggregateFunction(AggregateFunction::new_udf(
avg_udaf(),
vec![expr],
false,
None,
None,
None,
))
}
#[derive(Debug)]
pub struct Sum {
signature: Signature,
}
impl Sum {
pub fn new() -> Self {
Self {
signature: Signature::user_defined(Volatility::Immutable),
}
}
}
impl Default for Sum {
fn default() -> Self {
Self::new()
}
}
impl AggregateUDFImpl for Sum {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"sum"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
if arg_types.len() != 1 {
return exec_err!("SUM expects exactly one argument");
}
fn coerced_type(data_type: &DataType) -> Result<DataType> {
match data_type {
DataType::Dictionary(_, v) => coerced_type(v),
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
Ok(data_type.clone())
}
dt if dt.is_signed_integer() => Ok(DataType::Int64),
dt if dt.is_unsigned_integer() => Ok(DataType::UInt64),
dt if dt.is_floating() => Ok(DataType::Float64),
_ => exec_err!("Sum not supported for {}", data_type),
}
}
Ok(vec![coerced_type(&arg_types[0])?])
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
DataType::Int64 => Ok(DataType::Int64),
DataType::UInt64 => Ok(DataType::UInt64),
DataType::Float64 => Ok(DataType::Float64),
DataType::Decimal128(precision, scale) => {
let new_precision = DECIMAL128_MAX_PRECISION.min(*precision + 10);
Ok(DataType::Decimal128(new_precision, *scale))
}
DataType::Decimal256(precision, scale) => {
let new_precision = DECIMAL256_MAX_PRECISION.min(*precision + 10);
Ok(DataType::Decimal256(new_precision, *scale))
}
other => {
exec_err!("[return_type] SUM not supported for {}", other)
}
}
}
fn accumulator(&self, _args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
unreachable!("stub should not have accumulate()")
}
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
unreachable!("stub should not have state_fields()")
}
fn aliases(&self) -> &[String] {
&[]
}
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
false
}
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
unreachable!("stub should not have accumulate()")
}
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}
fn order_sensitivity(&self) -> AggregateOrderSensitivity {
AggregateOrderSensitivity::Insensitive
}
}
pub struct Count {
signature: Signature,
aliases: Vec<String>,
}
impl std::fmt::Debug for Count {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Count")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}
impl Default for Count {
fn default() -> Self {
Self::new()
}
}
impl Count {
pub fn new() -> Self {
Self {
aliases: vec!["count".to_string()],
signature: Signature::variadic_any(Volatility::Immutable),
}
}
}
impl AggregateUDFImpl for Count {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"COUNT"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
not_impl_err!("no impl for stub")
}
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
not_impl_err!("no impl for stub")
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
not_impl_err!("no impl for stub")
}
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}
}
create_func!(Min, min_udaf);
pub fn min(expr: Expr) -> Expr {
Expr::AggregateFunction(AggregateFunction::new_udf(
min_udaf(),
vec![expr],
false,
None,
None,
None,
))
}
pub struct Min {
signature: Signature,
}
impl std::fmt::Debug for Min {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Min")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}
impl Default for Min {
fn default() -> Self {
Self::new()
}
}
impl Min {
pub fn new() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
}
}
}
impl AggregateUDFImpl for Min {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"min"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
not_impl_err!("no impl for stub")
}
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
not_impl_err!("no impl for stub")
}
fn aliases(&self) -> &[String] {
&[]
}
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
not_impl_err!("no impl for stub")
}
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}
fn is_descending(&self) -> Option<bool> {
Some(false)
}
}
create_func!(Max, max_udaf);
pub fn max(expr: Expr) -> Expr {
Expr::AggregateFunction(AggregateFunction::new_udf(
max_udaf(),
vec![expr],
false,
None,
None,
None,
))
}
pub struct Max {
signature: Signature,
}
impl std::fmt::Debug for Max {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Max")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}
impl Default for Max {
fn default() -> Self {
Self::new()
}
}
impl Max {
pub fn new() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
}
}
}
impl AggregateUDFImpl for Max {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"max"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
not_impl_err!("no impl for stub")
}
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
not_impl_err!("no impl for stub")
}
fn aliases(&self) -> &[String] {
&[]
}
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
not_impl_err!("no impl for stub")
}
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}
fn is_descending(&self) -> Option<bool> {
Some(true)
}
}
#[derive(Debug)]
pub struct Avg {
signature: Signature,
aliases: Vec<String>,
}
impl Avg {
pub fn new() -> Self {
Self {
aliases: vec![String::from("mean")],
signature: Signature::uniform(1, NUMERICS.to_vec(), Immutable),
}
}
}
impl Default for Avg {
fn default() -> Self {
Self::new()
}
}
impl AggregateUDFImpl for Avg {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"avg"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
avg_return_type(self.name(), &arg_types[0])
}
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
not_impl_err!("no impl for stub")
}
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
not_impl_err!("no impl for stub")
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
coerce_avg_type(self.name(), arg_types)
}
}