use std::sync::Arc;
use arrow_array::builder::BufferBuilder;
use arrow_array::types::ByteArrayType;
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, NullBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType};
pub fn concat_elements_bytes<T: ByteArrayType>(
left: &GenericByteArray<T>,
right: &GenericByteArray<T>,
) -> Result<GenericByteArray<T>, ArrowError> {
if left.len() != right.len() {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length: {} != {}",
left.len(),
right.len()
)));
}
let nulls = NullBuffer::union(left.nulls(), right.nulls());
let left_offsets = left.value_offsets();
let right_offsets = right.value_offsets();
let left_values = left.value_data();
let right_values = right.value_data();
let mut output_values = BufferBuilder::<u8>::new(
left_values.len() + right_values.len()
- left_offsets[0].as_usize()
- right_offsets[0].as_usize(),
);
let mut output_offsets = BufferBuilder::<T::Offset>::new(left_offsets.len());
output_offsets.append(T::Offset::usize_as(0));
for (left_idx, right_idx) in left_offsets.windows(2).zip(right_offsets.windows(2)) {
output_values
.append_slice(&left_values[left_idx[0].as_usize()..left_idx[1].as_usize()]);
output_values.append_slice(
&right_values[right_idx[0].as_usize()..right_idx[1].as_usize()],
);
output_offsets.append(T::Offset::from_usize(output_values.len()).unwrap());
}
let builder = ArrayDataBuilder::new(T::DATA_TYPE)
.len(left.len())
.add_buffer(output_offsets.finish())
.add_buffer(output_values.finish())
.nulls(nulls);
Ok(unsafe { builder.build_unchecked() }.into())
}
pub fn concat_elements_utf8<Offset: OffsetSizeTrait>(
left: &GenericStringArray<Offset>,
right: &GenericStringArray<Offset>,
) -> Result<GenericStringArray<Offset>, ArrowError> {
concat_elements_bytes(left, right)
}
pub fn concat_element_binary<Offset: OffsetSizeTrait>(
left: &GenericBinaryArray<Offset>,
right: &GenericBinaryArray<Offset>,
) -> Result<GenericBinaryArray<Offset>, ArrowError> {
concat_elements_bytes(left, right)
}
pub fn concat_elements_utf8_many<Offset: OffsetSizeTrait>(
arrays: &[&GenericStringArray<Offset>],
) -> Result<GenericStringArray<Offset>, ArrowError> {
if arrays.is_empty() {
return Err(ArrowError::ComputeError(
"concat requires input of at least one array".to_string(),
));
}
let size = arrays[0].len();
if !arrays.iter().all(|array| array.len() == size) {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length of {size}",
)));
}
let nulls = arrays
.iter()
.fold(None, |acc, a| NullBuffer::union(acc.as_ref(), a.nulls()));
let data_values = arrays
.iter()
.map(|array| array.value_data())
.collect::<Vec<_>>();
let mut offsets = arrays
.iter()
.map(|a| a.value_offsets().iter().peekable())
.collect::<Vec<_>>();
let mut output_values = BufferBuilder::<u8>::new(
data_values
.iter()
.zip(offsets.iter_mut())
.map(|(data, offset)| data.len() - offset.peek().unwrap().as_usize())
.sum(),
);
let mut output_offsets = BufferBuilder::<Offset>::new(size + 1);
output_offsets.append(Offset::zero());
for _ in 0..size {
data_values
.iter()
.zip(offsets.iter_mut())
.for_each(|(values, offset)| {
let index_start = offset.next().unwrap().as_usize();
let index_end = offset.peek().unwrap().as_usize();
output_values.append_slice(&values[index_start..index_end]);
});
output_offsets.append(Offset::from_usize(output_values.len()).unwrap());
}
let builder = ArrayDataBuilder::new(GenericStringArray::<Offset>::DATA_TYPE)
.len(size)
.add_buffer(output_offsets.finish())
.add_buffer(output_values.finish())
.nulls(nulls);
Ok(unsafe { builder.build_unchecked() }.into())
}
pub fn concat_elements_dyn(
left: &dyn Array,
right: &dyn Array,
) -> Result<ArrayRef, ArrowError> {
if left.data_type() != right.data_type() {
return Err(ArrowError::ComputeError(format!(
"Cannot concat arrays of different types: {} != {}",
left.data_type(),
right.data_type()
)));
}
match (left.data_type(), right.data_type()) {
(DataType::Utf8, DataType::Utf8) => {
let left = left.as_any().downcast_ref::<StringArray>().unwrap();
let right = right.as_any().downcast_ref::<StringArray>().unwrap();
Ok(Arc::new(concat_elements_utf8(left, right).unwrap()))
}
(DataType::LargeUtf8, DataType::LargeUtf8) => {
let left = left.as_any().downcast_ref::<LargeStringArray>().unwrap();
let right = right.as_any().downcast_ref::<LargeStringArray>().unwrap();
Ok(Arc::new(concat_elements_utf8(left, right).unwrap()))
}
(DataType::Binary, DataType::Binary) => {
let left = left.as_any().downcast_ref::<BinaryArray>().unwrap();
let right = right.as_any().downcast_ref::<BinaryArray>().unwrap();
Ok(Arc::new(concat_element_binary(left, right).unwrap()))
}
(DataType::LargeBinary, DataType::LargeBinary) => {
let left = left.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
let right = right.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
Ok(Arc::new(concat_element_binary(left, right).unwrap()))
}
_ => Err(ArrowError::NotYetImplemented(format!(
"concat not supported for {}",
left.data_type()
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::StringArray;
#[test]
fn test_string_concat() {
let left = [Some("foo"), Some("bar"), None]
.into_iter()
.collect::<StringArray>();
let right = [None, Some("yyy"), Some("zzz")]
.into_iter()
.collect::<StringArray>();
let output = concat_elements_utf8(&left, &right).unwrap();
let expected = [None, Some("baryyy"), None]
.into_iter()
.collect::<StringArray>();
assert_eq!(output, expected);
}
#[test]
fn test_string_concat_empty_string() {
let left = [Some("foo"), Some(""), Some("bar")]
.into_iter()
.collect::<StringArray>();
let right = [Some("baz"), Some(""), Some("")]
.into_iter()
.collect::<StringArray>();
let output = concat_elements_utf8(&left, &right).unwrap();
let expected = [Some("foobaz"), Some(""), Some("bar")]
.into_iter()
.collect::<StringArray>();
assert_eq!(output, expected);
}
#[test]
fn test_string_concat_no_null() {
let left = StringArray::from(vec!["foo", "bar"]);
let right = StringArray::from(vec!["bar", "baz"]);
let output = concat_elements_utf8(&left, &right).unwrap();
let expected = StringArray::from(vec!["foobar", "barbaz"]);
assert_eq!(output, expected);
}
#[test]
fn test_string_concat_error() {
let left = StringArray::from(vec!["foo", "bar"]);
let right = StringArray::from(vec!["baz"]);
let output = concat_elements_utf8(&left, &right);
assert_eq!(
output.unwrap_err().to_string(),
"Compute error: Arrays must have the same length: 2 != 1".to_string()
);
}
#[test]
fn test_string_concat_slice() {
let left = &StringArray::from(vec![None, Some("foo"), Some("bar"), Some("baz")]);
let right = &StringArray::from(vec![Some("boo"), None, Some("far"), Some("faz")]);
let left_slice = left.slice(0, 3);
let right_slice = right.slice(1, 3);
let output = concat_elements_utf8(
left_slice
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.unwrap(),
right_slice
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.unwrap(),
)
.unwrap();
let expected = [None, Some("foofar"), Some("barfaz")]
.into_iter()
.collect::<StringArray>();
assert_eq!(output, expected);
let left_slice = left.slice(2, 2);
let right_slice = right.slice(1, 2);
let output = concat_elements_utf8(
left_slice
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.unwrap(),
right_slice
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.unwrap(),
)
.unwrap();
let expected = [None, Some("bazfar")].into_iter().collect::<StringArray>();
assert_eq!(output, expected);
}
#[test]
fn test_string_concat_error_empty() {
assert_eq!(
concat_elements_utf8_many::<i32>(&[])
.unwrap_err()
.to_string(),
"Compute error: concat requires input of at least one array".to_string()
);
}
#[test]
fn test_string_concat_one() {
let expected = [None, Some("baryyy"), None]
.into_iter()
.collect::<StringArray>();
let output = concat_elements_utf8_many(&[&expected]).unwrap();
assert_eq!(output, expected);
}
#[test]
fn test_string_concat_many() {
let foo = StringArray::from(vec![Some("f"), Some("o"), Some("o"), None]);
let bar = StringArray::from(vec![None, Some("b"), Some("a"), Some("r")]);
let baz = StringArray::from(vec![Some("b"), None, Some("a"), Some("z")]);
let output = concat_elements_utf8_many(&[&foo, &bar, &baz]).unwrap();
let expected = [None, None, Some("oaa"), None]
.into_iter()
.collect::<StringArray>();
assert_eq!(output, expected);
}
#[test]
fn test_concat_dyn_same_type() {
let left = StringArray::from(vec![Some("foo"), Some("bar"), None]);
let right = StringArray::from(vec![None, Some("yyy"), Some("zzz")]);
let output: StringArray = concat_elements_dyn(&left, &right)
.unwrap()
.into_data()
.into();
let expected = StringArray::from(vec![None, Some("baryyy"), None]);
assert_eq!(output, expected);
let left = LargeStringArray::from(vec![Some("foo"), Some("bar"), None]);
let right = LargeStringArray::from(vec![None, Some("yyy"), Some("zzz")]);
let output: LargeStringArray = concat_elements_dyn(&left, &right)
.unwrap()
.into_data()
.into();
let expected = LargeStringArray::from(vec![None, Some("baryyy"), None]);
assert_eq!(output, expected);
let left = BinaryArray::from_opt_vec(vec![Some(b"foo"), Some(b"bar"), None]);
let right = BinaryArray::from_opt_vec(vec![None, Some(b"yyy"), Some(b"zzz")]);
let output: BinaryArray = concat_elements_dyn(&left, &right)
.unwrap()
.into_data()
.into();
let expected = BinaryArray::from_opt_vec(vec![None, Some(b"baryyy"), None]);
assert_eq!(output, expected);
let left = LargeBinaryArray::from_opt_vec(vec![Some(b"foo"), Some(b"bar"), None]);
let right =
LargeBinaryArray::from_opt_vec(vec![None, Some(b"yyy"), Some(b"zzz")]);
let output: LargeBinaryArray = concat_elements_dyn(&left, &right)
.unwrap()
.into_data()
.into();
let expected = LargeBinaryArray::from_opt_vec(vec![None, Some(b"baryyy"), None]);
assert_eq!(output, expected);
}
#[test]
fn test_concat_dyn_different_type() {
let left = StringArray::from(vec![Some("foo"), Some("bar"), None]);
let right = LargeStringArray::from(vec![None, Some("1"), Some("2")]);
let output = concat_elements_dyn(&left, &right);
assert_eq!(
output.unwrap_err().to_string(),
"Compute error: Cannot concat arrays of different types: Utf8 != LargeUtf8"
.to_string()
);
}
}