use crate::utils::make_scalar_function;
use arrow::array::{Array, ArrayRef, Int64Array, ListArray};
use arrow::datatypes::{DataType, Field};
use arrow_array::types::{Date32Type, IntervalMonthDayNanoType};
use arrow_array::{Date32Array, NullArray};
use arrow_buffer::{
BooleanBufferBuilder, IntervalMonthDayNano, NullBuffer, OffsetBuffer,
};
use arrow_schema::DataType::{Date32, Int64, Interval, List};
use arrow_schema::IntervalUnit::MonthDayNano;
use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array};
use datafusion_common::{exec_err, not_impl_datafusion_err, Result};
use datafusion_expr::{
ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use std::any::Any;
use std::sync::Arc;
make_udf_expr_and_func!(
Range,
range,
start stop step,
"create a list of values in the range between start and stop",
range_udf
);
#[derive(Debug)]
pub(super) struct Range {
signature: Signature,
aliases: Vec<String>,
}
impl Range {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![Int64]),
TypeSignature::Exact(vec![Int64, Int64]),
TypeSignature::Exact(vec![Int64, Int64, Int64]),
TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]),
TypeSignature::Any(3),
],
Volatility::Immutable,
),
aliases: vec![],
}
}
}
impl ScalarUDFImpl for Range {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"range"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if arg_types.iter().any(|t| t.eq(&DataType::Null)) {
Ok(DataType::Null)
} else {
Ok(List(Arc::new(Field::new(
"item",
arg_types[0].clone(),
true,
))))
}
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.iter().any(|arg| arg.data_type() == DataType::Null) {
return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1))));
}
match args[0].data_type() {
Int64 => make_scalar_function(|args| gen_range_inner(args, false))(args),
Date32 => make_scalar_function(|args| gen_range_date(args, false))(args),
_ => {
exec_err!("unsupported type for range")
}
}
}
fn aliases(&self) -> &[String] {
&self.aliases
}
}
make_udf_expr_and_func!(
GenSeries,
gen_series,
start stop step,
"create a list of values in the range between start and stop, include upper bound",
gen_series_udf
);
#[derive(Debug)]
pub(super) struct GenSeries {
signature: Signature,
aliases: Vec<String>,
}
impl GenSeries {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![Int64]),
TypeSignature::Exact(vec![Int64, Int64]),
TypeSignature::Exact(vec![Int64, Int64, Int64]),
TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]),
TypeSignature::Any(3),
],
Volatility::Immutable,
),
aliases: vec![],
}
}
}
impl ScalarUDFImpl for GenSeries {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"generate_series"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if arg_types.iter().any(|t| t.eq(&DataType::Null)) {
Ok(DataType::Null)
} else {
Ok(List(Arc::new(Field::new(
"item",
arg_types[0].clone(),
true,
))))
}
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.iter().any(|arg| arg.data_type() == DataType::Null) {
return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1))));
}
match args[0].data_type() {
Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args),
Date32 => make_scalar_function(|args| gen_range_date(args, true))(args),
_ => {
exec_err!("unsupported type for range")
}
}
}
fn aliases(&self) -> &[String] {
&self.aliases
}
}
pub(super) fn gen_range_inner(
args: &[ArrayRef],
include_upper: bool,
) -> Result<ArrayRef> {
let (start_array, stop_array, step_array) = match args.len() {
1 => (None, as_int64_array(&args[0])?, None),
2 => (
Some(as_int64_array(&args[0])?),
as_int64_array(&args[1])?,
None,
),
3 => (
Some(as_int64_array(&args[0])?),
as_int64_array(&args[1])?,
Some(as_int64_array(&args[2])?),
),
_ => return exec_err!("gen_range expects 1 to 3 arguments"),
};
let mut values = vec![];
let mut offsets = vec![0];
let mut valid = BooleanBufferBuilder::new(stop_array.len());
for (idx, stop) in stop_array.iter().enumerate() {
match retrieve_range_args(start_array, stop, step_array, idx) {
Some((_, _, 0)) => {
return exec_err!(
"step can't be 0 for function {}(start [, stop, step])",
if include_upper {
"generate_series"
} else {
"range"
}
);
}
Some((start, stop, step)) => {
let step_abs = usize::try_from(step.unsigned_abs()).map_err(|_| {
not_impl_datafusion_err!("step {} can't fit into usize", step)
})?;
values.extend(
gen_range_iter(start, stop, step < 0, include_upper)
.step_by(step_abs),
);
offsets.push(values.len() as i32);
valid.append(true);
}
None => {
offsets.push(values.len() as i32);
valid.append(false);
}
};
}
let arr = Arc::new(ListArray::try_new(
Arc::new(Field::new("item", Int64, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(Int64Array::from(values)),
Some(NullBuffer::new(valid.finish())),
)?);
Ok(arr)
}
fn retrieve_range_args(
start_array: Option<&Int64Array>,
stop: Option<i64>,
step_array: Option<&Int64Array>,
idx: usize,
) -> Option<(i64, i64, i64)> {
let start =
start_array.map_or(Some(0), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
let stop = stop?;
let step =
step_array.map_or(Some(1), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
Some((start, stop, step))
}
fn gen_range_iter(
start: i64,
stop: i64,
decreasing: bool,
include_upper: bool,
) -> Box<dyn Iterator<Item = i64>> {
match (decreasing, include_upper) {
(true, true) => Box::new((stop..=start).rev()),
(true, false) => {
if stop == i64::MAX {
Box::new(std::iter::empty())
} else {
Box::new((stop + 1..=start).rev())
}
}
(false, true) => Box::new(start..=stop),
(false, false) => Box::new(start..stop),
}
}
fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> {
if args.len() != 3 {
return exec_err!("arguments length does not match");
}
let (start_array, stop_array, step_array) = (
Some(as_date32_array(&args[0])?),
as_date32_array(&args[1])?,
Some(as_interval_mdn_array(&args[2])?),
);
let mut values = vec![];
let mut offsets = vec![0];
for (idx, stop) in stop_array.iter().enumerate() {
let mut stop = stop.unwrap_or(0);
let start = start_array.as_ref().map(|x| x.value(idx)).unwrap_or(0);
let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(
IntervalMonthDayNano {
months: 0,
days: 0,
nanoseconds: 1,
},
);
let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
let neg = months < 0 || days < 0;
if !include_upper {
stop = Date32Type::subtract_month_day_nano(stop, step);
}
let mut new_date = start;
loop {
if neg && new_date < stop || !neg && new_date > stop {
break;
}
values.push(new_date);
new_date = Date32Type::add_month_day_nano(new_date, step);
}
offsets.push(values.len() as i32);
}
let arr = Arc::new(ListArray::try_new(
Arc::new(Field::new("item", Date32, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(Date32Array::from(values)),
None,
)?);
Ok(arr)
}