use std::any::Any;
use std::sync::Arc;
use arrow::array::builder::PrimitiveBuilder;
use arrow::array::cast::AsArray;
use arrow::array::types::{Date32Type, Int32Type};
use arrow::array::PrimitiveArray;
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::{Date32, Int32, Int64, UInt32, UInt64, Utf8, Utf8View};
use chrono::prelude::*;
use datafusion_common::{exec_err, Result, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
#[derive(Debug)]
pub struct MakeDateFunc {
signature: Signature,
}
impl Default for MakeDateFunc {
fn default() -> Self {
Self::new()
}
}
impl MakeDateFunc {
pub fn new() -> Self {
Self {
signature: Signature::uniform(
3,
vec![Int32, Int64, UInt32, UInt64, Utf8, Utf8View],
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for MakeDateFunc {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"make_date"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Date32)
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 3 {
return exec_err!(
"make_date function requires 3 arguments, got {}",
args.len()
);
}
let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});
let years = args[0].cast_to(&DataType::Int32, None)?;
let months = args[1].cast_to(&DataType::Int32, None)?;
let days = args[2].cast_to(&DataType::Int32, None)?;
let scalar_value_fn = |col: &ColumnarValue| -> Result<i32> {
let ColumnarValue::Scalar(s) = col else {
return exec_err!("Expected scalar value");
};
let ScalarValue::Int32(Some(i)) = s else {
return exec_err!("Unable to parse date from null/empty value");
};
Ok(*i)
};
let value = if let Some(array_size) = len {
let to_primitive_array_fn =
|col: &ColumnarValue| -> PrimitiveArray<Int32Type> {
match col {
ColumnarValue::Array(a) => {
a.as_primitive::<Int32Type>().to_owned()
}
_ => {
let v = scalar_value_fn(col).unwrap();
PrimitiveArray::<Int32Type>::from_value(v, array_size)
}
}
};
let years = to_primitive_array_fn(&years);
let months = to_primitive_array_fn(&months);
let days = to_primitive_array_fn(&days);
let mut builder: PrimitiveBuilder<Date32Type> =
PrimitiveArray::builder(array_size);
for i in 0..array_size {
make_date_inner(
years.value(i),
months.value(i),
days.value(i),
|days: i32| builder.append_value(days),
)?;
}
let arr = builder.finish();
ColumnarValue::Array(Arc::new(arr))
} else {
let mut value = 0;
make_date_inner(
scalar_value_fn(&years)?,
scalar_value_fn(&months)?,
scalar_value_fn(&days)?,
|days: i32| value = days,
)?;
ColumnarValue::Scalar(ScalarValue::Date32(Some(value)))
};
Ok(value)
}
}
fn make_date_inner<F: FnMut(i32)>(
year: i32,
month: i32,
day: i32,
mut date_consumer_fn: F,
) -> Result<()> {
let Ok(m) = u32::try_from(month) else {
return exec_err!("Month value '{month:?}' is out of range");
};
let Ok(d) = u32::try_from(day) else {
return exec_err!("Day value '{day:?}' is out of range");
};
if let Some(date) = NaiveDate::from_ymd_opt(year, m, d) {
const UNIX_DAYS_FROM_CE: i32 = 719_163;
date_consumer_fn(date.num_days_from_ce() - UNIX_DAYS_FROM_CE);
Ok(())
} else {
exec_err!("Unable to parse date from {year}, {month}, {day}")
}
}
#[cfg(test)]
mod tests {
use crate::datetime::make_date::MakeDateFunc;
use arrow::array::{Array, Date32Array, Int32Array, Int64Array, UInt32Array};
use datafusion_common::ScalarValue;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
use std::sync::Arc;
#[test]
fn test_make_date() {
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))),
])
.expect("that make_date parsed values without error");
if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res {
assert_eq!(19736, date.unwrap());
} else {
panic!("Expected a scalar value")
}
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))),
ColumnarValue::Scalar(ScalarValue::UInt64(Some(1))),
ColumnarValue::Scalar(ScalarValue::UInt32(Some(14))),
])
.expect("that make_date parsed values without error");
if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res {
assert_eq!(19736, date.unwrap());
} else {
panic!("Expected a scalar value")
}
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))),
ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("1".to_string()))),
ColumnarValue::Scalar(ScalarValue::Utf8(Some("14".to_string()))),
])
.expect("that make_date parsed values without error");
if let ColumnarValue::Scalar(ScalarValue::Date32(date)) = res {
assert_eq!(19736, date.unwrap());
} else {
panic!("Expected a scalar value")
}
let years = Arc::new((2021..2025).map(Some).collect::<Int64Array>());
let months = Arc::new((1..5).map(Some).collect::<Int32Array>());
let days = Arc::new((11..15).map(Some).collect::<UInt32Array>());
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Array(years),
ColumnarValue::Array(months),
ColumnarValue::Array(days),
])
.expect("that make_date parsed values without error");
if let ColumnarValue::Array(array) = res {
assert_eq!(array.len(), 4);
let mut builder = Date32Array::builder(4);
builder.append_value(18_638);
builder.append_value(19_035);
builder.append_value(19_429);
builder.append_value(19_827);
assert_eq!(&builder.finish() as &dyn Array, array.as_ref());
} else {
panic!("Expected a columnar array")
}
let res = MakeDateFunc::new()
.invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: make_date function requires 3 arguments, got 1"
);
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
]);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Arrow error: Cast error: Casting from Interval(YearMonth) to Int32 not supported"
);
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
ColumnarValue::Scalar(ScalarValue::UInt64(Some(u64::MAX))),
ColumnarValue::Scalar(ScalarValue::Int32(Some(22))),
]);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Arrow error: Cast error: Can't cast value 18446744073709551615 to type Int32"
);
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
ColumnarValue::Scalar(ScalarValue::Int32(Some(22))),
ColumnarValue::Scalar(ScalarValue::UInt32(Some(u32::MAX))),
]);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Arrow error: Cast error: Can't cast value 4294967295 to type Int32"
);
}
}