use arrow::array::{ArrayRef, StructArray};
use arrow::datatypes::{DataType, Field, Fields};
use datafusion_common::{exec_err, Result};
use datafusion_expr::ColumnarValue;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::sync::Arc;
fn array_struct(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.is_empty() {
return exec_err!("struct requires at least one argument");
}
let vec: Vec<_> = args
.iter()
.enumerate()
.map(|(i, arg)| {
let field_name = format!("c{i}");
Ok((
Arc::new(Field::new(
field_name.as_str(),
arg.data_type().clone(),
true,
)),
Arc::clone(arg),
))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(StructArray::from(vec)))
}
fn struct_expr(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = ColumnarValue::values_to_arrays(args)?;
Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?))
}
#[derive(Debug)]
pub struct StructFunc {
signature: Signature,
}
impl Default for StructFunc {
fn default() -> Self {
Self::new()
}
}
impl StructFunc {
pub fn new() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
}
}
}
impl ScalarUDFImpl for StructFunc {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"struct"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let return_fields = arg_types
.iter()
.enumerate()
.map(|(pos, dt)| Field::new(format!("c{pos}"), dt.clone(), true))
.collect::<Vec<Field>>();
Ok(DataType::Struct(Fields::from(return_fields)))
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
struct_expr(args)
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int64Array;
use datafusion_common::cast::as_struct_array;
use datafusion_common::ScalarValue;
#[test]
fn test_struct() {
let args = [
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(3))),
];
let struc = struct_expr(&args)
.expect("failed to initialize function struct")
.into_array(1)
.expect("Failed to convert to array");
let result =
as_struct_array(&struc).expect("failed to initialize function struct");
assert_eq!(
&Int64Array::from(vec![1]),
Arc::clone(result.column_by_name("c0").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
);
assert_eq!(
&Int64Array::from(vec![2]),
Arc::clone(result.column_by_name("c1").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
);
assert_eq!(
&Int64Array::from(vec![3]),
Arc::clone(result.column_by_name("c2").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
);
}
}