use arrow::array::Array;
use arrow::array::ArrayRef;
use arrow::array::ArrowNativeTypeOp;
use arrow::array::Capacities;
use arrow::array::GenericListArray;
use arrow::array::Int64Array;
use arrow::array::MutableArrayData;
use arrow::array::OffsetSizeTrait;
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::DataType;
use arrow_schema::DataType::{FixedSizeList, LargeList, List};
use arrow_schema::Field;
use datafusion_common::cast::as_int64_array;
use datafusion_common::cast::as_large_list_array;
use datafusion_common::cast::as_list_array;
use datafusion_common::{
exec_err, internal_datafusion_err, plan_err, DataFusionError, Result,
};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_ARRAY;
use datafusion_expr::Expr;
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::{Arc, OnceLock};
use crate::utils::make_scalar_function;
make_udf_expr_and_func!(
ArrayElement,
array_element,
array element,
"extracts the element with the index n from the array.",
array_element_udf
);
create_func!(ArraySlice, array_slice_udf);
make_udf_expr_and_func!(
ArrayPopFront,
array_pop_front,
array,
"returns the array without the first element.",
array_pop_front_udf
);
make_udf_expr_and_func!(
ArrayPopBack,
array_pop_back,
array,
"returns the array without the last element.",
array_pop_back_udf
);
make_udf_expr_and_func!(
ArrayAnyValue,
array_any_value,
array,
"returns the first non-null element in the array.",
array_any_value_udf
);
#[derive(Debug)]
pub(super) struct ArrayElement {
signature: Signature,
aliases: Vec<String>,
}
impl ArrayElement {
pub fn new() -> Self {
Self {
signature: Signature::array_and_index(Volatility::Immutable),
aliases: vec![
String::from("array_extract"),
String::from("list_element"),
String::from("list_extract"),
],
}
}
}
impl ScalarUDFImpl for ArrayElement {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_element"
}
fn display_name(&self, args: &[Expr]) -> Result<String> {
let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
if args_name.len() != 2 {
return exec_err!("expect 2 args, got {}", args_name.len());
}
Ok(format!("{}[{}]", args_name[0], args_name[1]))
}
fn schema_name(&self, args: &[Expr]) -> Result<String> {
let args_name = args
.iter()
.map(|e| e.schema_name().to_string())
.collect::<Vec<_>>();
if args_name.len() != 2 {
return exec_err!("expect 2 args, got {}", args_name.len());
}
Ok(format!("{}[{}]", args_name[0], args_name[1]))
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
List(field)
| LargeList(field)
| FixedSizeList(field, _) => Ok(field.data_type().clone()),
_ => plan_err!(
"ArrayElement can only accept List, LargeList or FixedSizeList as the first argument"
),
}
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_scalar_function(array_element_inner)(args)
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn documentation(&self) -> Option<&Documentation> {
Some(get_array_element_doc())
}
}
static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
fn get_array_element_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_ARRAY)
.with_description(
"Extracts the element with the index n from the array.",
)
.with_syntax_example("array_element(array, index)")
.with_sql_example(
r#"```sql
> select array_element([1, 2, 3, 4], 3);
+-----------------------------------------+
| array_element(List([1,2,3,4]),Int64(3)) |
+-----------------------------------------+
| 3 |
+-----------------------------------------+
```"#,
)
.with_argument(
"array",
"Array expression. Can be a constant, column, or function, and any combination of array operators.",
)
.with_argument(
"index",
"Index to extract the element from the array.",
)
.build()
.unwrap()
})
}
fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_element needs two arguments");
}
match &args[0].data_type() {
List(_) => {
let array = as_list_array(&args[0])?;
let indexes = as_int64_array(&args[1])?;
general_array_element::<i32>(array, indexes)
}
LargeList(_) => {
let array = as_large_list_array(&args[0])?;
let indexes = as_int64_array(&args[1])?;
general_array_element::<i64>(array, indexes)
}
_ => exec_err!(
"array_element does not support type: {:?}",
args[0].data_type()
),
}
}
fn general_array_element<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
indexes: &Int64Array,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], true, capacity);
fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
let index: O = index.try_into().map_err(|_| {
DataFusionError::Execution(format!(
"array_element got invalid index: {}",
index
))
})?;
let adjusted_zero_index = if index < O::usize_as(0) {
index + len
} else {
index - O::usize_as(1)
};
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
Ok(None)
}
}
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;
if len == O::usize_as(0) {
mutable.extend_nulls(1);
continue;
}
let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
if let Some(index) = index {
let start = start.as_usize() + index.as_usize();
mutable.extend(0, start, start + 1_usize);
} else {
mutable.extend_nulls(1);
}
}
let data = mutable.freeze();
Ok(arrow::array::make_array(data))
}
#[doc = "returns a slice of the array."]
pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
let args = match stride {
Some(stride) => vec![array, begin, end, stride],
None => vec![array, begin, end],
};
array_slice_udf().call(args)
}
#[derive(Debug)]
pub(super) struct ArraySlice {
signature: Signature,
aliases: Vec<String>,
}
impl ArraySlice {
pub fn new() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
aliases: vec![String::from("list_slice")],
}
}
}
impl ScalarUDFImpl for ArraySlice {
fn as_any(&self) -> &dyn Any {
self
}
fn display_name(&self, args: &[Expr]) -> Result<String> {
let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
if let Some((arr, indexes)) = args_name.split_first() {
Ok(format!("{arr}[{}]", indexes.join(":")))
} else {
exec_err!("no argument")
}
}
fn schema_name(&self, args: &[Expr]) -> Result<String> {
let args_name = args
.iter()
.map(|e| e.schema_name().to_string())
.collect::<Vec<_>>();
if let Some((arr, indexes)) = args_name.split_first() {
Ok(format!("{arr}[{}]", indexes.join(":")))
} else {
exec_err!("no argument")
}
}
fn name(&self) -> &str {
"array_slice"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_scalar_function(array_slice_inner)(args)
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn documentation(&self) -> Option<&Documentation> {
Some(get_array_slice_doc())
}
}
fn get_array_slice_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_ARRAY)
.with_description(
"Returns a slice of the array based on 1-indexed start and end positions.",
)
.with_syntax_example("array_slice(array, begin, end)")
.with_sql_example(
r#"```sql
> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
+--------------------------------------------------------+
| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
+--------------------------------------------------------+
| [3, 4, 5, 6] |
+--------------------------------------------------------+
```"#,
)
.with_argument(
"array",
"Array expression. Can be a constant, column, or function, and any combination of array operators.",
)
.with_argument(
"begin",
"Index of the first element. If negative, it counts backward from the end of the array.",
)
.with_argument(
"end",
"Index of the last element. If negative, it counts backward from the end of the array.",
)
.with_argument(
"stride",
"Stride of the array slice. The default is 1.",
)
.build()
.unwrap()
})
}
fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
let args_len = args.len();
if args_len != 3 && args_len != 4 {
return exec_err!("array_slice needs three or four arguments");
}
let stride = if args_len == 4 {
Some(as_int64_array(&args[3])?)
} else {
None
};
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;
let array_data_type = args[0].data_type();
match array_data_type {
List(_) => {
let array = as_list_array(&args[0])?;
general_array_slice::<i32>(array, from_array, to_array, stride)
}
LargeList(_) => {
let array = as_large_list_array(&args[0])?;
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;
general_array_slice::<i64>(array, from_array, to_array, stride)
}
_ => exec_err!("array_slice does not support type: {:?}", array_data_type),
}
}
fn general_array_slice<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
from_array: &Int64Array,
to_array: &Int64Array,
stride: Option<&Int64Array>,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);
fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
let adjusted_zero_index = if index < 0 {
if let Ok(index) = index.try_into() {
index + len
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
} else {
if let Ok(index) = index.try_into() {
std::cmp::max(index - O::usize_as(1), O::usize_as(0))
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
};
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
Ok(None)
}
}
fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
let adjusted_zero_index = if index < 0 {
if let Ok(index) = index.try_into() {
index + len
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
} else {
if let Ok(index) = index.try_into() {
std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
};
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
Ok(None)
}
}
let mut offsets = vec![O::usize_as(0)];
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;
if len == O::usize_as(0) {
offsets.push(offsets[row_index]);
continue;
}
let from_index = if from_array.is_null(row_index) {
Some(O::usize_as(0))
} else {
adjusted_from_index::<O>(from_array.value(row_index), len)?
};
let to_index = if to_array.is_null(row_index) {
Some(len - O::usize_as(1))
} else {
adjusted_to_index::<O>(to_array.value(row_index), len)?
};
if let (Some(from), Some(to)) = (from_index, to_index) {
let stride = stride.map(|s| s.value(row_index));
let stride = stride.unwrap_or(1);
if stride.is_zero() {
return exec_err!(
"array_slice got invalid stride: {:?}, it cannot be 0",
stride
);
} else if (from <= to && stride.is_negative())
|| (from > to && stride.is_positive())
{
offsets.push(offsets[row_index]);
continue;
}
let stride: O = stride.try_into().map_err(|_| {
internal_datafusion_err!("array_slice got invalid stride: {}", stride)
})?;
if from <= to {
assert!(start + to <= end);
if stride.eq(&O::one()) {
mutable.extend(
0,
(start + from).to_usize().unwrap(),
(start + to + O::usize_as(1)).to_usize().unwrap(),
);
offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
continue;
}
let mut index = start + from;
let mut cnt = 0;
while index <= start + to {
mutable.extend(
0,
index.to_usize().unwrap(),
index.to_usize().unwrap() + 1,
);
index += stride;
cnt += 1;
}
offsets.push(offsets[row_index] + O::usize_as(cnt));
} else {
let mut index = start + from;
let mut cnt = 0;
while index >= start + to {
mutable.extend(
0,
index.to_usize().unwrap(),
index.to_usize().unwrap() + 1,
);
index += stride;
cnt += 1;
}
offsets.push(offsets[row_index] + O::usize_as(cnt));
}
} else {
offsets.push(offsets[row_index]);
}
}
let data = mutable.freeze();
Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new("item", array.value_type(), true)),
OffsetBuffer::<O>::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
}
#[derive(Debug)]
pub(super) struct ArrayPopFront {
signature: Signature,
aliases: Vec<String>,
}
impl ArrayPopFront {
pub fn new() -> Self {
Self {
signature: Signature::array(Volatility::Immutable),
aliases: vec![String::from("list_pop_front")],
}
}
}
impl ScalarUDFImpl for ArrayPopFront {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_pop_front"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_scalar_function(array_pop_front_inner)(args)
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn documentation(&self) -> Option<&Documentation> {
Some(get_array_pop_front_doc())
}
}
fn get_array_pop_front_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_ARRAY)
.with_description(
"Returns the array without the first element.",
)
.with_syntax_example("array_pop_front(array)")
.with_sql_example(
r#"```sql
> select array_pop_front([1, 2, 3]);
+-------------------------------+
| array_pop_front(List([1,2,3])) |
+-------------------------------+
| [2, 3] |
+-------------------------------+
```"#,
)
.with_argument(
"array",
"Array expression. Can be a constant, column, or function, and any combination of array operators.",
)
.build()
.unwrap()
})
}
fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
let array_data_type = args[0].data_type();
match array_data_type {
List(_) => {
let array = as_list_array(&args[0])?;
general_pop_front_list::<i32>(array)
}
LargeList(_) => {
let array = as_large_list_array(&args[0])?;
general_pop_front_list::<i64>(array)
}
_ => exec_err!(
"array_pop_front does not support type: {:?}",
array_data_type
),
}
}
fn general_pop_front_list<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
let from_array = Int64Array::from(vec![2; array.len()]);
let to_array = Int64Array::from(
array
.iter()
.map(|arr| arr.map_or(0, |arr| arr.len() as i64))
.collect::<Vec<i64>>(),
);
general_array_slice::<O>(array, &from_array, &to_array, None)
}
#[derive(Debug)]
pub(super) struct ArrayPopBack {
signature: Signature,
aliases: Vec<String>,
}
impl ArrayPopBack {
pub fn new() -> Self {
Self {
signature: Signature::array(Volatility::Immutable),
aliases: vec![String::from("list_pop_back")],
}
}
}
impl ScalarUDFImpl for ArrayPopBack {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_pop_back"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_scalar_function(array_pop_back_inner)(args)
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn documentation(&self) -> Option<&Documentation> {
Some(get_array_pop_back_doc())
}
}
fn get_array_pop_back_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_ARRAY)
.with_description(
"Returns the array without the last element.",
)
.with_syntax_example("array_pop_back(array)")
.with_sql_example(
r#"```sql
> select array_pop_back([1, 2, 3]);
+-------------------------------+
| array_pop_back(List([1,2,3])) |
+-------------------------------+
| [1, 2] |
+-------------------------------+
```"#,
)
.with_argument(
"array",
"Array expression. Can be a constant, column, or function, and any combination of array operators.",
)
.build()
.unwrap()
})
}
fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 1 {
return exec_err!("array_pop_back needs one argument");
}
let array_data_type = args[0].data_type();
match array_data_type {
List(_) => {
let array = as_list_array(&args[0])?;
general_pop_back_list::<i32>(array)
}
LargeList(_) => {
let array = as_large_list_array(&args[0])?;
general_pop_back_list::<i64>(array)
}
_ => exec_err!(
"array_pop_back does not support type: {:?}",
array_data_type
),
}
}
fn general_pop_back_list<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
let from_array = Int64Array::from(vec![1; array.len()]);
let to_array = Int64Array::from(
array
.iter()
.map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
.collect::<Vec<i64>>(),
);
general_array_slice::<O>(array, &from_array, &to_array, None)
}
#[derive(Debug)]
pub(super) struct ArrayAnyValue {
signature: Signature,
aliases: Vec<String>,
}
impl ArrayAnyValue {
pub fn new() -> Self {
Self {
signature: Signature::array(Volatility::Immutable),
aliases: vec![String::from("list_any_value")],
}
}
}
impl ScalarUDFImpl for ArrayAnyValue {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_any_value"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
List(field)
| LargeList(field)
| FixedSizeList(field, _) => Ok(field.data_type().clone()),
_ => plan_err!(
"array_any_value can only accept List, LargeList or FixedSizeList as the argument"
),
}
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_scalar_function(array_any_value_inner)(args)
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn documentation(&self) -> Option<&Documentation> {
Some(get_array_any_value_doc())
}
}
fn get_array_any_value_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_ARRAY)
.with_description(
"Returns the first non-null element in the array.",
)
.with_syntax_example("array_any_value(array)")
.with_sql_example(
r#"```sql
> select array_any_value([NULL, 1, 2, 3]);
+-------------------------------+
| array_any_value(List([NULL,1,2,3])) |
+-------------------------------------+
| 1 |
+-------------------------------------+
```"#,
)
.with_argument(
"array",
"Array expression. Can be a constant, column, or function, and any combination of array operators.",
)
.build()
.unwrap()
})
}
fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 1 {
return exec_err!("array_any_value expects one argument");
}
match &args[0].data_type() {
List(_) => {
let array = as_list_array(&args[0])?;
general_array_any_value::<i32>(array)
}
LargeList(_) => {
let array = as_large_list_array(&args[0])?;
general_array_any_value::<i64>(array)
}
data_type => exec_err!("array_any_value does not support type: {:?}", data_type),
}
}
fn general_array_any_value<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(array.len());
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], true, capacity);
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;
if len == O::usize_as(0) {
mutable.extend_nulls(1);
continue;
}
let row_value = array.value(row_index);
match row_value.nulls() {
Some(row_nulls_buffer) => {
if let Some(first_non_null_index) =
row_nulls_buffer.valid_indices().next()
{
let index = start.as_usize() + first_non_null_index;
mutable.extend(0, index, index + 1)
} else {
mutable.extend_nulls(1);
}
}
None => {
let index = start.as_usize();
mutable.extend(0, index, index + 1);
}
}
}
let data = mutable.freeze();
Ok(arrow::array::make_array(data))
}