mod binary_array;
use crate::types::*;
use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_data::ArrayData;
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use std::any::Any;
use std::sync::Arc;
pub use binary_array::*;
mod boolean_array;
pub use boolean_array::*;
mod byte_array;
pub use byte_array::*;
mod dictionary_array;
pub use dictionary_array::*;
mod fixed_size_binary_array;
pub use fixed_size_binary_array::*;
mod fixed_size_list_array;
pub use fixed_size_list_array::*;
mod list_array;
pub use list_array::*;
mod map_array;
pub use map_array::*;
mod null_array;
pub use null_array::*;
mod primitive_array;
pub use primitive_array::*;
mod string_array;
pub use string_array::*;
mod struct_array;
pub use struct_array::*;
mod union_array;
pub use union_array::*;
mod run_array;
pub use run_array::*;
pub trait Array: std::fmt::Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn to_data(&self) -> ArrayData;
fn into_data(self) -> ArrayData;
fn data_type(&self) -> &DataType;
fn slice(&self, offset: usize, length: usize) -> ArrayRef;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn offset(&self) -> usize;
fn nulls(&self) -> Option<&NullBuffer>;
fn is_null(&self, index: usize) -> bool {
self.nulls().map(|n| n.is_null(index)).unwrap_or_default()
}
fn is_valid(&self, index: usize) -> bool {
!self.is_null(index)
}
fn null_count(&self) -> usize {
self.nulls().map(|n| n.null_count()).unwrap_or_default()
}
fn get_buffer_memory_size(&self) -> usize;
fn get_array_memory_size(&self) -> usize;
}
pub type ArrayRef = Arc<dyn Array>;
impl Array for ArrayRef {
fn as_any(&self) -> &dyn Any {
self.as_ref().as_any()
}
fn to_data(&self) -> ArrayData {
self.as_ref().to_data()
}
fn into_data(self) -> ArrayData {
self.to_data()
}
fn data_type(&self) -> &DataType {
self.as_ref().data_type()
}
fn slice(&self, offset: usize, length: usize) -> ArrayRef {
self.as_ref().slice(offset, length)
}
fn len(&self) -> usize {
self.as_ref().len()
}
fn is_empty(&self) -> bool {
self.as_ref().is_empty()
}
fn offset(&self) -> usize {
self.as_ref().offset()
}
fn nulls(&self) -> Option<&NullBuffer> {
self.as_ref().nulls()
}
fn is_null(&self, index: usize) -> bool {
self.as_ref().is_null(index)
}
fn is_valid(&self, index: usize) -> bool {
self.as_ref().is_valid(index)
}
fn null_count(&self) -> usize {
self.as_ref().null_count()
}
fn get_buffer_memory_size(&self) -> usize {
self.as_ref().get_buffer_memory_size()
}
fn get_array_memory_size(&self) -> usize {
self.as_ref().get_array_memory_size()
}
}
impl<'a, T: Array> Array for &'a T {
fn as_any(&self) -> &dyn Any {
T::as_any(self)
}
fn to_data(&self) -> ArrayData {
T::to_data(self)
}
fn into_data(self) -> ArrayData {
self.to_data()
}
fn data_type(&self) -> &DataType {
T::data_type(self)
}
fn slice(&self, offset: usize, length: usize) -> ArrayRef {
T::slice(self, offset, length)
}
fn len(&self) -> usize {
T::len(self)
}
fn is_empty(&self) -> bool {
T::is_empty(self)
}
fn offset(&self) -> usize {
T::offset(self)
}
fn nulls(&self) -> Option<&NullBuffer> {
T::nulls(self)
}
fn is_null(&self, index: usize) -> bool {
T::is_null(self, index)
}
fn is_valid(&self, index: usize) -> bool {
T::is_valid(self, index)
}
fn null_count(&self) -> usize {
T::null_count(self)
}
fn get_buffer_memory_size(&self) -> usize {
T::get_buffer_memory_size(self)
}
fn get_array_memory_size(&self) -> usize {
T::get_array_memory_size(self)
}
}
pub trait ArrayAccessor: Array {
type Item: Send + Sync;
fn value(&self, index: usize) -> Self::Item;
unsafe fn value_unchecked(&self, index: usize) -> Self::Item;
}
impl PartialEq for dyn Array + '_ {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl<T: Array> PartialEq<T> for dyn Array + '_ {
fn eq(&self, other: &T) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl PartialEq for NullArray {
fn eq(&self, other: &NullArray) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl<T: ArrowPrimitiveType> PartialEq for PrimitiveArray<T> {
fn eq(&self, other: &PrimitiveArray<T>) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl<K: ArrowDictionaryKeyType> PartialEq for DictionaryArray<K> {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl PartialEq for BooleanArray {
fn eq(&self, other: &BooleanArray) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl<OffsetSize: OffsetSizeTrait> PartialEq for GenericStringArray<OffsetSize> {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl<OffsetSize: OffsetSizeTrait> PartialEq for GenericBinaryArray<OffsetSize> {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl PartialEq for FixedSizeBinaryArray {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl<OffsetSize: OffsetSizeTrait> PartialEq for GenericListArray<OffsetSize> {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl PartialEq for MapArray {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl PartialEq for FixedSizeListArray {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
impl PartialEq for StructArray {
fn eq(&self, other: &Self) -> bool {
self.to_data().eq(&other.to_data())
}
}
pub fn make_array(data: ArrayData) -> ArrayRef {
match data.data_type() {
DataType::Boolean => Arc::new(BooleanArray::from(data)) as ArrayRef,
DataType::Int8 => Arc::new(Int8Array::from(data)) as ArrayRef,
DataType::Int16 => Arc::new(Int16Array::from(data)) as ArrayRef,
DataType::Int32 => Arc::new(Int32Array::from(data)) as ArrayRef,
DataType::Int64 => Arc::new(Int64Array::from(data)) as ArrayRef,
DataType::UInt8 => Arc::new(UInt8Array::from(data)) as ArrayRef,
DataType::UInt16 => Arc::new(UInt16Array::from(data)) as ArrayRef,
DataType::UInt32 => Arc::new(UInt32Array::from(data)) as ArrayRef,
DataType::UInt64 => Arc::new(UInt64Array::from(data)) as ArrayRef,
DataType::Float16 => Arc::new(Float16Array::from(data)) as ArrayRef,
DataType::Float32 => Arc::new(Float32Array::from(data)) as ArrayRef,
DataType::Float64 => Arc::new(Float64Array::from(data)) as ArrayRef,
DataType::Date32 => Arc::new(Date32Array::from(data)) as ArrayRef,
DataType::Date64 => Arc::new(Date64Array::from(data)) as ArrayRef,
DataType::Time32(TimeUnit::Second) => {
Arc::new(Time32SecondArray::from(data)) as ArrayRef
}
DataType::Time32(TimeUnit::Millisecond) => {
Arc::new(Time32MillisecondArray::from(data)) as ArrayRef
}
DataType::Time64(TimeUnit::Microsecond) => {
Arc::new(Time64MicrosecondArray::from(data)) as ArrayRef
}
DataType::Time64(TimeUnit::Nanosecond) => {
Arc::new(Time64NanosecondArray::from(data)) as ArrayRef
}
DataType::Timestamp(TimeUnit::Second, _) => {
Arc::new(TimestampSecondArray::from(data)) as ArrayRef
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
Arc::new(TimestampMillisecondArray::from(data)) as ArrayRef
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
Arc::new(TimestampMicrosecondArray::from(data)) as ArrayRef
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
Arc::new(TimestampNanosecondArray::from(data)) as ArrayRef
}
DataType::Interval(IntervalUnit::YearMonth) => {
Arc::new(IntervalYearMonthArray::from(data)) as ArrayRef
}
DataType::Interval(IntervalUnit::DayTime) => {
Arc::new(IntervalDayTimeArray::from(data)) as ArrayRef
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
Arc::new(IntervalMonthDayNanoArray::from(data)) as ArrayRef
}
DataType::Duration(TimeUnit::Second) => {
Arc::new(DurationSecondArray::from(data)) as ArrayRef
}
DataType::Duration(TimeUnit::Millisecond) => {
Arc::new(DurationMillisecondArray::from(data)) as ArrayRef
}
DataType::Duration(TimeUnit::Microsecond) => {
Arc::new(DurationMicrosecondArray::from(data)) as ArrayRef
}
DataType::Duration(TimeUnit::Nanosecond) => {
Arc::new(DurationNanosecondArray::from(data)) as ArrayRef
}
DataType::Binary => Arc::new(BinaryArray::from(data)) as ArrayRef,
DataType::LargeBinary => Arc::new(LargeBinaryArray::from(data)) as ArrayRef,
DataType::FixedSizeBinary(_) => {
Arc::new(FixedSizeBinaryArray::from(data)) as ArrayRef
}
DataType::Utf8 => Arc::new(StringArray::from(data)) as ArrayRef,
DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data)) as ArrayRef,
DataType::List(_) => Arc::new(ListArray::from(data)) as ArrayRef,
DataType::LargeList(_) => Arc::new(LargeListArray::from(data)) as ArrayRef,
DataType::Struct(_) => Arc::new(StructArray::from(data)) as ArrayRef,
DataType::Map(_, _) => Arc::new(MapArray::from(data)) as ArrayRef,
DataType::Union(_, _) => Arc::new(UnionArray::from(data)) as ArrayRef,
DataType::FixedSizeList(_, _) => {
Arc::new(FixedSizeListArray::from(data)) as ArrayRef
}
DataType::Dictionary(ref key_type, _) => match key_type.as_ref() {
DataType::Int8 => {
Arc::new(DictionaryArray::<Int8Type>::from(data)) as ArrayRef
}
DataType::Int16 => {
Arc::new(DictionaryArray::<Int16Type>::from(data)) as ArrayRef
}
DataType::Int32 => {
Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef
}
DataType::Int64 => {
Arc::new(DictionaryArray::<Int64Type>::from(data)) as ArrayRef
}
DataType::UInt8 => {
Arc::new(DictionaryArray::<UInt8Type>::from(data)) as ArrayRef
}
DataType::UInt16 => {
Arc::new(DictionaryArray::<UInt16Type>::from(data)) as ArrayRef
}
DataType::UInt32 => {
Arc::new(DictionaryArray::<UInt32Type>::from(data)) as ArrayRef
}
DataType::UInt64 => {
Arc::new(DictionaryArray::<UInt64Type>::from(data)) as ArrayRef
}
dt => panic!("Unexpected dictionary key type {dt:?}"),
},
DataType::RunEndEncoded(ref run_ends_type, _) => {
match run_ends_type.data_type() {
DataType::Int16 => {
Arc::new(RunArray::<Int16Type>::from(data)) as ArrayRef
}
DataType::Int32 => {
Arc::new(RunArray::<Int32Type>::from(data)) as ArrayRef
}
DataType::Int64 => {
Arc::new(RunArray::<Int64Type>::from(data)) as ArrayRef
}
dt => panic!("Unexpected data type for run_ends array {dt:?}"),
}
}
DataType::Null => Arc::new(NullArray::from(data)) as ArrayRef,
DataType::Decimal128(_, _) => Arc::new(Decimal128Array::from(data)) as ArrayRef,
DataType::Decimal256(_, _) => Arc::new(Decimal256Array::from(data)) as ArrayRef,
dt => panic!("Unexpected data type {dt:?}"),
}
}
pub fn new_empty_array(data_type: &DataType) -> ArrayRef {
let data = ArrayData::new_empty(data_type);
make_array(data)
}
pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef {
make_array(ArrayData::new_null(data_type, length))
}
unsafe fn get_offsets<O: ArrowNativeType>(data: &ArrayData) -> OffsetBuffer<O> {
match data.is_empty() && data.buffers()[0].is_empty() {
true => OffsetBuffer::new_empty(),
false => {
let buffer = ScalarBuffer::new(
data.buffers()[0].clone(),
data.offset(),
data.len() + 1,
);
unsafe { OffsetBuffer::new_unchecked(buffer) }
}
}
}
fn print_long_array<A, F>(
array: &A,
f: &mut std::fmt::Formatter,
print_item: F,
) -> std::fmt::Result
where
A: Array,
F: Fn(&A, usize, &mut std::fmt::Formatter) -> std::fmt::Result,
{
let head = std::cmp::min(10, array.len());
for i in 0..head {
if array.is_null(i) {
writeln!(f, " null,")?;
} else {
write!(f, " ")?;
print_item(array, i, f)?;
writeln!(f, ",")?;
}
}
if array.len() > 10 {
if array.len() > 20 {
writeln!(f, " ...{} elements...,", array.len() - 20)?;
}
let tail = std::cmp::max(head, array.len() - 10);
for i in tail..array.len() {
if array.is_null(i) {
writeln!(f, " null,")?;
} else {
write!(f, " ")?;
print_item(array, i, f)?;
writeln!(f, ",")?;
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cast::{as_union_array, downcast_array};
use crate::downcast_run_array;
use arrow_buffer::MutableBuffer;
use arrow_schema::{Field, Fields, UnionFields, UnionMode};
#[test]
fn test_empty_primitive() {
let array = new_empty_array(&DataType::Int32);
let a = array.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(a.len(), 0);
let expected: &[i32] = &[];
assert_eq!(a.values(), expected);
}
#[test]
fn test_empty_variable_sized() {
let array = new_empty_array(&DataType::Utf8);
let a = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(a.len(), 0);
assert_eq!(a.value_offsets()[0], 0i32);
}
#[test]
fn test_empty_list_primitive() {
let data_type =
DataType::List(Arc::new(Field::new("item", DataType::Int32, false)));
let array = new_empty_array(&data_type);
let a = array.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(a.len(), 0);
assert_eq!(a.value_offsets()[0], 0i32);
}
#[test]
fn test_null_boolean() {
let array = new_null_array(&DataType::Boolean, 9);
let a = array.as_any().downcast_ref::<BooleanArray>().unwrap();
assert_eq!(a.len(), 9);
for i in 0..9 {
assert!(a.is_null(i));
}
}
#[test]
fn test_null_primitive() {
let array = new_null_array(&DataType::Int32, 9);
let a = array.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(a.len(), 9);
for i in 0..9 {
assert!(a.is_null(i));
}
}
#[test]
fn test_null_struct() {
let struct_type =
DataType::Struct(vec![Field::new("data", DataType::Int64, false)].into());
let array = new_null_array(&struct_type, 9);
let a = array.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(a.len(), 9);
assert_eq!(a.column(0).len(), 9);
for i in 0..9 {
assert!(a.is_null(i));
}
a.slice(0, 5);
}
#[test]
fn test_null_variable_sized() {
let array = new_null_array(&DataType::Utf8, 9);
let a = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(a.len(), 9);
assert_eq!(a.value_offsets()[9], 0i32);
for i in 0..9 {
assert!(a.is_null(i));
}
}
#[test]
fn test_null_list_primitive() {
let data_type =
DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
let array = new_null_array(&data_type, 9);
let a = array.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(a.len(), 9);
assert_eq!(a.value_offsets()[9], 0i32);
for i in 0..9 {
assert!(a.is_null(i));
}
}
#[test]
fn test_null_map() {
let data_type = DataType::Map(
Arc::new(Field::new(
"entry",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Int32, true),
])),
false,
)),
false,
);
let array = new_null_array(&data_type, 9);
let a = array.as_any().downcast_ref::<MapArray>().unwrap();
assert_eq!(a.len(), 9);
assert_eq!(a.value_offsets()[9], 0i32);
for i in 0..9 {
assert!(a.is_null(i));
}
}
#[test]
fn test_null_dictionary() {
let values = vec![None, None, None, None, None, None, None, None, None]
as Vec<Option<&str>>;
let array: DictionaryArray<Int8Type> = values.into_iter().collect();
let array = Arc::new(array) as ArrayRef;
let null_array = new_null_array(array.data_type(), 9);
assert_eq!(&array, &null_array);
assert_eq!(
array.to_data().buffers()[0].len(),
null_array.to_data().buffers()[0].len()
);
}
#[test]
fn test_null_union() {
for mode in [UnionMode::Sparse, UnionMode::Dense] {
let data_type = DataType::Union(
UnionFields::new(
vec![2, 1],
vec![
Field::new("foo", DataType::Int32, true),
Field::new("bar", DataType::Int64, true),
],
),
mode,
);
let array = new_null_array(&data_type, 4);
let array = as_union_array(array.as_ref());
assert_eq!(array.len(), 4);
assert_eq!(array.null_count(), 0);
for i in 0..4 {
let a = array.value(i);
assert_eq!(a.len(), 1);
assert_eq!(a.null_count(), 1);
assert!(a.is_null(0))
}
}
}
#[test]
#[allow(unused_parens)]
fn test_null_runs() {
for r in [DataType::Int16, DataType::Int32, DataType::Int64] {
let data_type = DataType::RunEndEncoded(
Arc::new(Field::new("run_ends", r, false)),
Arc::new(Field::new("values", DataType::Utf8, true)),
);
let array = new_null_array(&data_type, 4);
let array = array.as_ref();
downcast_run_array! {
array => {
assert_eq!(array.len(), 4);
assert_eq!(array.null_count(), 0);
assert_eq!(array.values().len(), 1);
assert_eq!(array.values().null_count(), 1);
assert_eq!(array.run_ends().len(), 4);
assert_eq!(array.run_ends().values(), &[4]);
let idx = array.get_physical_indices(&[0, 1, 2, 3]).unwrap();
assert_eq!(idx, &[0,0,0,0]);
}
d => unreachable!("{d}")
}
}
}
#[test]
fn test_null_fixed_size_binary() {
for size in [1, 2, 7] {
let array = new_null_array(&DataType::FixedSizeBinary(size), 6);
let array = array
.as_ref()
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap();
assert_eq!(array.len(), 6);
assert_eq!(array.null_count(), 6);
array.iter().for_each(|x| assert!(x.is_none()));
}
}
#[test]
fn test_memory_size_null() {
let null_arr = NullArray::new(32);
assert_eq!(0, null_arr.get_buffer_memory_size());
assert_eq!(
std::mem::size_of::<usize>(),
null_arr.get_array_memory_size()
);
}
#[test]
fn test_memory_size_primitive() {
let arr = PrimitiveArray::<Int64Type>::from_iter_values(0..128);
let empty =
PrimitiveArray::<Int64Type>::from(ArrayData::new_empty(arr.data_type()));
assert_eq!(
arr.get_array_memory_size() - empty.get_array_memory_size(),
128 * std::mem::size_of::<i64>()
);
}
#[test]
fn test_memory_size_primitive_nullable() {
let arr: PrimitiveArray<Int64Type> = (0..128)
.map(|i| if i % 20 == 0 { Some(i) } else { None })
.collect();
let empty_with_bitmap = PrimitiveArray::<Int64Type>::from(
ArrayData::builder(arr.data_type().clone())
.add_buffer(MutableBuffer::new(0).into())
.null_bit_buffer(Some(MutableBuffer::new_null(0).into()))
.build()
.unwrap(),
);
assert_eq!(
std::mem::size_of::<PrimitiveArray<Int64Type>>(),
empty_with_bitmap.get_array_memory_size()
);
assert_eq!(
arr.get_array_memory_size() - empty_with_bitmap.get_array_memory_size(),
128 * std::mem::size_of::<i64>() + 64
);
}
#[test]
fn test_memory_size_dictionary() {
let values = PrimitiveArray::<Int64Type>::from_iter_values(0..16);
let keys = PrimitiveArray::<Int16Type>::from_iter_values(
(0..256).map(|i| (i % values.len()) as i16),
);
let dict_data_type = DataType::Dictionary(
Box::new(keys.data_type().clone()),
Box::new(values.data_type().clone()),
);
let dict_data = keys
.into_data()
.into_builder()
.data_type(dict_data_type)
.child_data(vec![values.into_data()])
.build()
.unwrap();
let empty_data = ArrayData::new_empty(&DataType::Dictionary(
Box::new(DataType::Int16),
Box::new(DataType::Int64),
));
let arr = DictionaryArray::<Int16Type>::from(dict_data);
let empty = DictionaryArray::<Int16Type>::from(empty_data);
let expected_keys_size = 256 * std::mem::size_of::<i16>();
assert_eq!(
arr.keys().get_array_memory_size() - empty.keys().get_array_memory_size(),
expected_keys_size
);
let expected_values_size = 16 * std::mem::size_of::<i64>();
assert_eq!(
arr.values().get_array_memory_size() - empty.values().get_array_memory_size(),
expected_values_size
);
let expected_size = expected_keys_size + expected_values_size;
assert_eq!(
arr.get_array_memory_size() - empty.get_array_memory_size(),
expected_size
);
}
fn compute_my_thing(arr: &dyn Array) -> bool {
!arr.is_empty()
}
#[test]
fn test_array_ref_as_array() {
let arr: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert!(compute_my_thing(&arr));
let arr: ArrayRef = Arc::new(arr);
assert!(compute_my_thing(&arr));
assert!(compute_my_thing(arr.as_ref()));
}
#[test]
fn test_downcast_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
let boxed: ArrayRef = Arc::new(array);
let array: Int32Array = downcast_array(&boxed);
let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(array, expected);
}
}