use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
use arrow_buffer::NullBufferBuilder;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType};
use std::any::Any;
use std::sync::Arc;
pub type Int8Builder = PrimitiveBuilder<Int8Type>;
pub type Int16Builder = PrimitiveBuilder<Int16Type>;
pub type Int32Builder = PrimitiveBuilder<Int32Type>;
pub type Int64Builder = PrimitiveBuilder<Int64Type>;
pub type UInt8Builder = PrimitiveBuilder<UInt8Type>;
pub type UInt16Builder = PrimitiveBuilder<UInt16Type>;
pub type UInt32Builder = PrimitiveBuilder<UInt32Type>;
pub type UInt64Builder = PrimitiveBuilder<UInt64Type>;
pub type Float16Builder = PrimitiveBuilder<Float16Type>;
pub type Float32Builder = PrimitiveBuilder<Float32Type>;
pub type Float64Builder = PrimitiveBuilder<Float64Type>;
pub type TimestampSecondBuilder = PrimitiveBuilder<TimestampSecondType>;
pub type TimestampMillisecondBuilder = PrimitiveBuilder<TimestampMillisecondType>;
pub type TimestampMicrosecondBuilder = PrimitiveBuilder<TimestampMicrosecondType>;
pub type TimestampNanosecondBuilder = PrimitiveBuilder<TimestampNanosecondType>;
pub type Date32Builder = PrimitiveBuilder<Date32Type>;
pub type Date64Builder = PrimitiveBuilder<Date64Type>;
pub type Time32SecondBuilder = PrimitiveBuilder<Time32SecondType>;
pub type Time32MillisecondBuilder = PrimitiveBuilder<Time32MillisecondType>;
pub type Time64MicrosecondBuilder = PrimitiveBuilder<Time64MicrosecondType>;
pub type Time64NanosecondBuilder = PrimitiveBuilder<Time64NanosecondType>;
pub type IntervalYearMonthBuilder = PrimitiveBuilder<IntervalYearMonthType>;
pub type IntervalDayTimeBuilder = PrimitiveBuilder<IntervalDayTimeType>;
pub type IntervalMonthDayNanoBuilder = PrimitiveBuilder<IntervalMonthDayNanoType>;
pub type DurationSecondBuilder = PrimitiveBuilder<DurationSecondType>;
pub type DurationMillisecondBuilder = PrimitiveBuilder<DurationMillisecondType>;
pub type DurationMicrosecondBuilder = PrimitiveBuilder<DurationMicrosecondType>;
pub type DurationNanosecondBuilder = PrimitiveBuilder<DurationNanosecondType>;
pub type Decimal128Builder = PrimitiveBuilder<Decimal128Type>;
pub type Decimal256Builder = PrimitiveBuilder<Decimal256Type>;
#[derive(Debug)]
pub struct PrimitiveBuilder<T: ArrowPrimitiveType> {
values_builder: BufferBuilder<T::Native>,
null_buffer_builder: NullBufferBuilder,
data_type: DataType,
}
impl<T: ArrowPrimitiveType> ArrayBuilder for PrimitiveBuilder<T> {
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
self
}
fn len(&self) -> usize {
self.values_builder.len()
}
fn finish(&mut self) -> ArrayRef {
Arc::new(self.finish())
}
fn finish_cloned(&self) -> ArrayRef {
Arc::new(self.finish_cloned())
}
}
impl<T: ArrowPrimitiveType> Default for PrimitiveBuilder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
pub fn new() -> Self {
Self::with_capacity(1024)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
values_builder: BufferBuilder::<T::Native>::new(capacity),
null_buffer_builder: NullBufferBuilder::new(capacity),
data_type: T::DATA_TYPE,
}
}
pub fn new_from_buffer(
values_buffer: MutableBuffer,
null_buffer: Option<MutableBuffer>,
) -> Self {
let values_builder = BufferBuilder::<T::Native>::new_from_buffer(values_buffer);
let null_buffer_builder = null_buffer
.map(|buffer| NullBufferBuilder::new_from_buffer(buffer, values_builder.len()))
.unwrap_or_else(|| NullBufferBuilder::new_with_len(values_builder.len()));
Self {
values_builder,
null_buffer_builder,
data_type: T::DATA_TYPE,
}
}
pub fn with_data_type(self, data_type: DataType) -> Self {
assert!(
PrimitiveArray::<T>::is_compatible(&data_type),
"incompatible data type for builder, expected {} got {}",
T::DATA_TYPE,
data_type
);
Self { data_type, ..self }
}
pub fn capacity(&self) -> usize {
self.values_builder.capacity()
}
#[inline]
pub fn append_value(&mut self, v: T::Native) {
self.null_buffer_builder.append_non_null();
self.values_builder.append(v);
}
#[inline]
pub fn append_null(&mut self) {
self.null_buffer_builder.append_null();
self.values_builder.advance(1);
}
#[inline]
pub fn append_nulls(&mut self, n: usize) {
self.null_buffer_builder.append_n_nulls(n);
self.values_builder.advance(n);
}
#[inline]
pub fn append_option(&mut self, v: Option<T::Native>) {
match v {
None => self.append_null(),
Some(v) => self.append_value(v),
};
}
#[inline]
pub fn append_slice(&mut self, v: &[T::Native]) {
self.null_buffer_builder.append_n_non_nulls(v.len());
self.values_builder.append_slice(v);
}
#[inline]
pub fn append_values(&mut self, values: &[T::Native], is_valid: &[bool]) {
assert_eq!(
values.len(),
is_valid.len(),
"Value and validity lengths must be equal"
);
self.null_buffer_builder.append_slice(is_valid);
self.values_builder.append_slice(values);
}
#[inline]
pub unsafe fn append_trusted_len_iter(&mut self, iter: impl IntoIterator<Item = T::Native>) {
let iter = iter.into_iter();
let len = iter
.size_hint()
.1
.expect("append_trusted_len_iter requires an upper bound");
self.null_buffer_builder.append_n_non_nulls(len);
self.values_builder.append_trusted_len_iter(iter);
}
pub fn finish(&mut self) -> PrimitiveArray<T> {
let len = self.len();
let nulls = self.null_buffer_builder.finish();
let builder = ArrayData::builder(self.data_type.clone())
.len(len)
.add_buffer(self.values_builder.finish())
.nulls(nulls);
let array_data = unsafe { builder.build_unchecked() };
PrimitiveArray::<T>::from(array_data)
}
pub fn finish_cloned(&self) -> PrimitiveArray<T> {
let len = self.len();
let nulls = self.null_buffer_builder.finish_cloned();
let values_buffer = Buffer::from_slice_ref(self.values_builder.as_slice());
let builder = ArrayData::builder(self.data_type.clone())
.len(len)
.add_buffer(values_buffer)
.nulls(nulls);
let array_data = unsafe { builder.build_unchecked() };
PrimitiveArray::<T>::from(array_data)
}
pub fn values_slice(&self) -> &[T::Native] {
self.values_builder.as_slice()
}
pub fn values_slice_mut(&mut self) -> &mut [T::Native] {
self.values_builder.as_slice_mut()
}
pub fn validity_slice(&self) -> Option<&[u8]> {
self.null_buffer_builder.as_slice()
}
pub fn validity_slice_mut(&mut self) -> Option<&mut [u8]> {
self.null_buffer_builder.as_slice_mut()
}
pub fn slices_mut(&mut self) -> (&mut [T::Native], Option<&mut [u8]>) {
(
self.values_builder.as_slice_mut(),
self.null_buffer_builder.as_slice_mut(),
)
}
}
impl<P: DecimalType> PrimitiveBuilder<P> {
pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result<Self, ArrowError> {
validate_decimal_precision_and_scale::<P>(precision, scale)?;
Ok(Self {
data_type: P::TYPE_CONSTRUCTOR(precision, scale),
..self
})
}
}
impl<P: ArrowTimestampType> PrimitiveBuilder<P> {
pub fn with_timezone(self, timezone: impl Into<Arc<str>>) -> Self {
self.with_timezone_opt(Some(timezone.into()))
}
pub fn with_timezone_opt<S: Into<Arc<str>>>(self, timezone: Option<S>) -> Self {
Self {
data_type: DataType::Timestamp(P::UNIT, timezone.map(Into::into)),
..self
}
}
}
impl<P: ArrowPrimitiveType> Extend<Option<P::Native>> for PrimitiveBuilder<P> {
#[inline]
fn extend<T: IntoIterator<Item = Option<P::Native>>>(&mut self, iter: T) {
for v in iter {
self.append_option(v)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_buffer::Buffer;
use arrow_schema::TimeUnit;
use crate::array::Array;
use crate::array::BooleanArray;
use crate::array::Date32Array;
use crate::array::Int32Array;
use crate::array::TimestampSecondArray;
use crate::builder::Int32Builder;
#[test]
fn test_primitive_array_builder_i32() {
let mut builder = Int32Array::builder(5);
for i in 0..5 {
builder.append_value(i);
}
let arr = builder.finish();
assert_eq!(5, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(0, arr.null_count());
for i in 0..5 {
assert!(!arr.is_null(i));
assert!(arr.is_valid(i));
assert_eq!(i as i32, arr.value(i));
}
}
#[test]
fn test_primitive_array_builder_i32_append_iter() {
let mut builder = Int32Array::builder(5);
unsafe { builder.append_trusted_len_iter(0..5) };
let arr = builder.finish();
assert_eq!(5, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(0, arr.null_count());
for i in 0..5 {
assert!(!arr.is_null(i));
assert!(arr.is_valid(i));
assert_eq!(i as i32, arr.value(i));
}
}
#[test]
fn test_primitive_array_builder_i32_append_nulls() {
let mut builder = Int32Array::builder(5);
builder.append_nulls(5);
let arr = builder.finish();
assert_eq!(5, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(5, arr.null_count());
for i in 0..5 {
assert!(arr.is_null(i));
assert!(!arr.is_valid(i));
}
}
#[test]
fn test_primitive_array_builder_date32() {
let mut builder = Date32Array::builder(5);
for i in 0..5 {
builder.append_value(i);
}
let arr = builder.finish();
assert_eq!(5, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(0, arr.null_count());
for i in 0..5 {
assert!(!arr.is_null(i));
assert!(arr.is_valid(i));
assert_eq!(i as i32, arr.value(i));
}
}
#[test]
fn test_primitive_array_builder_timestamp_second() {
let mut builder = TimestampSecondArray::builder(5);
for i in 0..5 {
builder.append_value(i);
}
let arr = builder.finish();
assert_eq!(5, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(0, arr.null_count());
for i in 0..5 {
assert!(!arr.is_null(i));
assert!(arr.is_valid(i));
assert_eq!(i as i64, arr.value(i));
}
}
#[test]
fn test_primitive_array_builder_bool() {
let buf = Buffer::from([72_u8, 2_u8]);
let mut builder = BooleanArray::builder(10);
for i in 0..10 {
if i == 3 || i == 6 || i == 9 {
builder.append_value(true);
} else {
builder.append_value(false);
}
}
let arr = builder.finish();
assert_eq!(&buf, arr.values().inner());
assert_eq!(10, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(0, arr.null_count());
for i in 0..10 {
assert!(!arr.is_null(i));
assert!(arr.is_valid(i));
assert_eq!(i == 3 || i == 6 || i == 9, arr.value(i), "failed at {i}")
}
}
#[test]
fn test_primitive_array_builder_append_option() {
let arr1 = Int32Array::from(vec![Some(0), None, Some(2), None, Some(4)]);
let mut builder = Int32Array::builder(5);
builder.append_option(Some(0));
builder.append_option(None);
builder.append_option(Some(2));
builder.append_option(None);
builder.append_option(Some(4));
let arr2 = builder.finish();
assert_eq!(arr1.len(), arr2.len());
assert_eq!(arr1.offset(), arr2.offset());
assert_eq!(arr1.null_count(), arr2.null_count());
for i in 0..5 {
assert_eq!(arr1.is_null(i), arr2.is_null(i));
assert_eq!(arr1.is_valid(i), arr2.is_valid(i));
if arr1.is_valid(i) {
assert_eq!(arr1.value(i), arr2.value(i));
}
}
}
#[test]
fn test_primitive_array_builder_append_null() {
let arr1 = Int32Array::from(vec![Some(0), Some(2), None, None, Some(4)]);
let mut builder = Int32Array::builder(5);
builder.append_value(0);
builder.append_value(2);
builder.append_null();
builder.append_null();
builder.append_value(4);
let arr2 = builder.finish();
assert_eq!(arr1.len(), arr2.len());
assert_eq!(arr1.offset(), arr2.offset());
assert_eq!(arr1.null_count(), arr2.null_count());
for i in 0..5 {
assert_eq!(arr1.is_null(i), arr2.is_null(i));
assert_eq!(arr1.is_valid(i), arr2.is_valid(i));
if arr1.is_valid(i) {
assert_eq!(arr1.value(i), arr2.value(i));
}
}
}
#[test]
fn test_primitive_array_builder_append_slice() {
let arr1 = Int32Array::from(vec![Some(0), Some(2), None, None, Some(4)]);
let mut builder = Int32Array::builder(5);
builder.append_slice(&[0, 2]);
builder.append_null();
builder.append_null();
builder.append_value(4);
let arr2 = builder.finish();
assert_eq!(arr1.len(), arr2.len());
assert_eq!(arr1.offset(), arr2.offset());
assert_eq!(arr1.null_count(), arr2.null_count());
for i in 0..5 {
assert_eq!(arr1.is_null(i), arr2.is_null(i));
assert_eq!(arr1.is_valid(i), arr2.is_valid(i));
if arr1.is_valid(i) {
assert_eq!(arr1.value(i), arr2.value(i));
}
}
}
#[test]
fn test_primitive_array_builder_finish() {
let mut builder = Int32Builder::new();
builder.append_slice(&[2, 4, 6, 8]);
let mut arr = builder.finish();
assert_eq!(4, arr.len());
assert_eq!(0, builder.len());
builder.append_slice(&[1, 3, 5, 7, 9]);
arr = builder.finish();
assert_eq!(5, arr.len());
assert_eq!(0, builder.len());
}
#[test]
fn test_primitive_array_builder_finish_cloned() {
let mut builder = Int32Builder::new();
builder.append_value(23);
builder.append_value(45);
let result = builder.finish_cloned();
assert_eq!(result, Int32Array::from(vec![23, 45]));
builder.append_value(56);
assert_eq!(builder.finish_cloned(), Int32Array::from(vec![23, 45, 56]));
builder.append_slice(&[2, 4, 6, 8]);
let mut arr = builder.finish();
assert_eq!(7, arr.len());
assert_eq!(arr, Int32Array::from(vec![23, 45, 56, 2, 4, 6, 8]));
assert_eq!(0, builder.len());
builder.append_slice(&[1, 3, 5, 7, 9]);
arr = builder.finish();
assert_eq!(5, arr.len());
assert_eq!(0, builder.len());
}
#[test]
fn test_primitive_array_builder_with_data_type() {
let mut builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(1, 2));
builder.append_value(1);
let array = builder.finish();
assert_eq!(array.precision(), 1);
assert_eq!(array.scale(), 2);
let data_type = DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into()));
let mut builder = TimestampNanosecondBuilder::new().with_data_type(data_type.clone());
builder.append_value(1);
let array = builder.finish();
assert_eq!(array.data_type(), &data_type);
}
#[test]
#[should_panic(expected = "incompatible data type for builder, expected Int32 got Int64")]
fn test_invalid_with_data_type() {
Int32Builder::new().with_data_type(DataType::Int64);
}
#[test]
fn test_extend() {
let mut builder = PrimitiveBuilder::<Int16Type>::new();
builder.extend([1, 2, 3, 5, 2, 4, 4].into_iter().map(Some));
builder.extend([2, 4, 6, 2].into_iter().map(Some));
let array = builder.finish();
assert_eq!(array.values(), &[1, 2, 3, 5, 2, 4, 4, 2, 4, 6, 2]);
}
}