1use super::{data::new_buffers, ArrayData, ArrayDataBuilder, ByteView};
24use crate::bit_mask::set_bits;
25use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
26use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
27use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode};
28use half::f16;
29use num::Integer;
30use std::mem;
32mod boolean;
33mod fixed_binary;
34mod fixed_size_list;
35mod list;
36mod null;
37mod primitive;
38mod structure;
39mod union;
40mod utils;
41mod variable_size;
43type ExtendNullBits<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize) + 'a>;
44type Extend<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize, usize) + 'a>;
48type ExtendNulls = Box<dyn Fn(&mut _MutableArrayData, usize)>;
53struct _MutableArrayData<'a> {
54 pub data_type: DataType,
55 pub null_count: usize,
57 pub len: usize,
58 pub null_buffer: Option<MutableBuffer>,
60 pub buffer1: MutableBuffer,
63 pub buffer2: MutableBuffer,
64 pub child_data: Vec<MutableArrayData<'a>>,
67impl _MutableArrayData<'_> {
68 fn null_buffer(&mut self) -> &mut MutableBuffer {
69 self.null_buffer
70 .as_mut()
71 .expect("MutableArrayData not nullable")
72 }
75fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits {
76 if let Some(nulls) = array.nulls() {
77 let bytes = nulls.validity();
78 Box::new(move |mutable, start, len| {
79 let mutable_len = mutable.len;
80 let out = mutable.null_buffer();
81 utils::resize_for_bits(out, mutable_len + len);
82 mutable.null_count += set_bits(
83 out.as_slice_mut(),
84 bytes,
85 mutable_len,
86 nulls.offset() + start,
87 len,
88 );
89 })
90 } else if use_nulls {
91 Box::new(|mutable, _, len| {
92 let mutable_len = mutable.len;
93 let out = mutable.null_buffer();
94 utils::resize_for_bits(out, mutable_len + len);
95 let write_data = out.as_slice_mut();
96 (0..len).for_each(|i| {
97 bit_util::set_bit(write_data, mutable_len + i);
98 });
99 })
100 } else {
101 Box::new(|_, _, _| {})
102 }
105pub struct MutableArrayData<'a> {
134 #[allow(dead_code)]
139 arrays: Vec<&'a ArrayData>,
141 data: _MutableArrayData<'a>,
149 dictionary: Option<ArrayData>,
155 variadic_data_buffers: Vec<Buffer>,
162 extend_values: Vec<Extend<'a>>,
168 extend_null_bits: Vec<ExtendNullBits<'a>>,
174 extend_nulls: ExtendNulls,
180impl std::fmt::Debug for MutableArrayData<'_> {
181 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
182 f.debug_struct("MutableArrayData")
184 .field("data", &self.data)
185 .finish()
186 }
189fn build_extend_dictionary(array: &ArrayData, offset: usize, max: usize) -> Option<Extend> {
193 macro_rules! validate_and_build {
194 ($dt: ty) => {{
195 let _: $dt = max.try_into().ok()?;
196 let offset: $dt = offset.try_into().ok()?;
197 Some(primitive::build_extend_with_offset(array, offset))
198 }};
199 }
200 match array.data_type() {
201 DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
202 DataType::UInt8 => validate_and_build!(u8),
203 DataType::UInt16 => validate_and_build!(u16),
204 DataType::UInt32 => validate_and_build!(u32),
205 DataType::UInt64 => validate_and_build!(u64),
206 DataType::Int8 => validate_and_build!(i8),
207 DataType::Int16 => validate_and_build!(i16),
208 DataType::Int32 => validate_and_build!(i32),
209 DataType::Int64 => validate_and_build!(i64),
210 _ => unreachable!(),
211 },
212 _ => None,
213 }
216fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend {
218 let views = array.buffer::<u128>(0);
219 Box::new(
220 move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| {
221 mutable
222 .buffer1
223 .extend(views[start..start + len].iter().map(|v| {
224 let len = *v as u32;
225 if len <= 12 {
226 return *v; }
228 let mut view = ByteView::from(*v);
229 view.buffer_index += buffer_offset;
230 view.into()
231 }))
232 },
233 )
236fn build_extend(array: &ArrayData) -> Extend {
237 match array.data_type() {
238 DataType::Null => null::build_extend(array),
239 DataType::Boolean => boolean::build_extend(array),
240 DataType::UInt8 => primitive::build_extend::<u8>(array),
241 DataType::UInt16 => primitive::build_extend::<u16>(array),
242 DataType::UInt32 => primitive::build_extend::<u32>(array),
243 DataType::UInt64 => primitive::build_extend::<u64>(array),
244 DataType::Int8 => primitive::build_extend::<i8>(array),
245 DataType::Int16 => primitive::build_extend::<i16>(array),
246 DataType::Int32 => primitive::build_extend::<i32>(array),
247 DataType::Int64 => primitive::build_extend::<i64>(array),
248 DataType::Float32 => primitive::build_extend::<f32>(array),
249 DataType::Float64 => primitive::build_extend::<f64>(array),
250 DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
251 primitive::build_extend::<i32>(array)
252 }
253 DataType::Date64
254 | DataType::Time64(_)
255 | DataType::Timestamp(_, _)
256 | DataType::Duration(_)
257 | DataType::Interval(IntervalUnit::DayTime) => primitive::build_extend::<i64>(array),
258 DataType::Interval(IntervalUnit::MonthDayNano) => primitive::build_extend::<i128>(array),
259 DataType::Decimal128(_, _) => primitive::build_extend::<i128>(array),
260 DataType::Decimal256(_, _) => primitive::build_extend::<i256>(array),
261 DataType::Utf8 | DataType::Binary => variable_size::build_extend::<i32>(array),
262 DataType::LargeUtf8 | DataType::LargeBinary => variable_size::build_extend::<i64>(array),
263 DataType::BinaryView | DataType::Utf8View => unreachable!("should use build_extend_view"),
264 DataType::Map(_, _) | DataType::List(_) => list::build_extend::<i32>(array),
265 DataType::ListView(_) | DataType::LargeListView(_) => {
266 unimplemented!("ListView/LargeListView not implemented")
267 }
268 DataType::LargeList(_) => list::build_extend::<i64>(array),
269 DataType::Dictionary(_, _) => unreachable!("should use build_extend_dictionary"),
270 DataType::Struct(_) => structure::build_extend(array),
271 DataType::FixedSizeBinary(_) => fixed_binary::build_extend(array),
272 DataType::Float16 => primitive::build_extend::<f16>(array),
273 DataType::FixedSizeList(_, _) => fixed_size_list::build_extend(array),
274 DataType::Union(_, mode) => match mode {
275 UnionMode::Sparse => union::build_extend_sparse(array),
276 UnionMode::Dense => union::build_extend_dense(array),
277 },
278 DataType::RunEndEncoded(_, _) => todo!(),
279 }
282fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
283 Box::new(match data_type {
284 DataType::Null => null::extend_nulls,
285 DataType::Boolean => boolean::extend_nulls,
286 DataType::UInt8 => primitive::extend_nulls::<u8>,
287 DataType::UInt16 => primitive::extend_nulls::<u16>,
288 DataType::UInt32 => primitive::extend_nulls::<u32>,
289 DataType::UInt64 => primitive::extend_nulls::<u64>,
290 DataType::Int8 => primitive::extend_nulls::<i8>,
291 DataType::Int16 => primitive::extend_nulls::<i16>,
292 DataType::Int32 => primitive::extend_nulls::<i32>,
293 DataType::Int64 => primitive::extend_nulls::<i64>,
294 DataType::Float32 => primitive::extend_nulls::<f32>,
295 DataType::Float64 => primitive::extend_nulls::<f64>,
296 DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
297 primitive::extend_nulls::<i32>
298 }
299 DataType::Date64
300 | DataType::Time64(_)
301 | DataType::Timestamp(_, _)
302 | DataType::Duration(_)
303 | DataType::Interval(IntervalUnit::DayTime) => primitive::extend_nulls::<i64>,
304 DataType::Interval(IntervalUnit::MonthDayNano) => primitive::extend_nulls::<i128>,
305 DataType::Decimal128(_, _) => primitive::extend_nulls::<i128>,
306 DataType::Decimal256(_, _) => primitive::extend_nulls::<i256>,
307 DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::<i32>,
308 DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::<i64>,
309 DataType::BinaryView | DataType::Utf8View => primitive::extend_nulls::<u128>,
310 DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::<i32>,
311 DataType::ListView(_) | DataType::LargeListView(_) => {
312 unimplemented!("ListView/LargeListView not implemented")
313 }
314 DataType::LargeList(_) => list::extend_nulls::<i64>,
315 DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
316 DataType::UInt8 => primitive::extend_nulls::<u8>,
317 DataType::UInt16 => primitive::extend_nulls::<u16>,
318 DataType::UInt32 => primitive::extend_nulls::<u32>,
319 DataType::UInt64 => primitive::extend_nulls::<u64>,
320 DataType::Int8 => primitive::extend_nulls::<i8>,
321 DataType::Int16 => primitive::extend_nulls::<i16>,
322 DataType::Int32 => primitive::extend_nulls::<i32>,
323 DataType::Int64 => primitive::extend_nulls::<i64>,
324 _ => unreachable!(),
325 },
326 DataType::Struct(_) => structure::extend_nulls,
327 DataType::FixedSizeBinary(_) => fixed_binary::extend_nulls,
328 DataType::Float16 => primitive::extend_nulls::<f16>,
329 DataType::FixedSizeList(_, _) => fixed_size_list::extend_nulls,
330 DataType::Union(_, mode) => match mode {
331 UnionMode::Sparse => union::extend_nulls_sparse,
332 UnionMode::Dense => union::extend_nulls_dense,
333 },
334 DataType::RunEndEncoded(_, _) => todo!(),
335 })
338fn preallocate_offset_and_binary_buffer<Offset: ArrowNativeType + Integer>(
339 capacity: usize,
340 binary_size: usize,
341) -> [MutableBuffer; 2] {
342 let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<Offset>());
344 buffer.push(Offset::zero());
347 [
348 buffer,
349 MutableBuffer::new(binary_size * mem::size_of::<u8>()),
350 ]
353#[derive(Debug, Clone)]
355pub enum Capacities {
356 Binary(usize, Option<usize>),
362 List(usize, Option<Box<Capacities>>),
368 Struct(usize, Option<Vec<Capacities>>),
374 Dictionary(usize, Option<Box<Capacities>>),
380 Array(usize),
384impl<'a> MutableArrayData<'a> {
385 pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
398 Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity))
399 }
401 pub fn with_capacities(
411 arrays: Vec<&'a ArrayData>,
412 use_nulls: bool,
413 capacities: Capacities,
414 ) -> Self {
415 let data_type = arrays[0].data_type();
417 for a in arrays.iter().skip(1) {
418 assert_eq!(
419 data_type,
420 a.data_type(),
421 "Arrays with inconsistent types passed to MutableArrayData"
422 )
423 }
425 let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0);
429 let mut array_capacity;
431 let [buffer1, buffer2] = match (data_type, &capacities) {
432 (
433 DataType::LargeUtf8 | DataType::LargeBinary,
434 Capacities::Binary(capacity, Some(value_cap)),
435 ) => {
436 array_capacity = *capacity;
437 preallocate_offset_and_binary_buffer::<i64>(*capacity, *value_cap)
438 }
439 (DataType::Utf8 | DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => {
440 array_capacity = *capacity;
441 preallocate_offset_and_binary_buffer::<i32>(*capacity, *value_cap)
442 }
443 (_, Capacities::Array(capacity)) => {
444 array_capacity = *capacity;
445 new_buffers(data_type, *capacity)
446 }
447 (
448 DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _),
449 Capacities::List(capacity, _),
450 ) => {
451 array_capacity = *capacity;
452 new_buffers(data_type, *capacity)
453 }
454 _ => panic!("Capacities: {capacities:?} not yet supported"),
455 };
457 let child_data = match &data_type {
458 DataType::Decimal128(_, _)
459 | DataType::Decimal256(_, _)
460 | DataType::Null
461 | DataType::Boolean
462 | DataType::UInt8
463 | DataType::UInt16
464 | DataType::UInt32
465 | DataType::UInt64
466 | DataType::Int8
467 | DataType::Int16
468 | DataType::Int32
469 | DataType::Int64
470 | DataType::Float16
471 | DataType::Float32
472 | DataType::Float64
473 | DataType::Date32
474 | DataType::Date64
475 | DataType::Time32(_)
476 | DataType::Time64(_)
477 | DataType::Duration(_)
478 | DataType::Timestamp(_, _)
479 | DataType::Utf8
480 | DataType::Binary
481 | DataType::LargeUtf8
482 | DataType::LargeBinary
483 | DataType::BinaryView
484 | DataType::Utf8View
485 | DataType::Interval(_)
486 | DataType::FixedSizeBinary(_) => vec![],
487 DataType::ListView(_) | DataType::LargeListView(_) => {
488 unimplemented!("ListView/LargeListView not implemented")
489 }
490 DataType::Map(_, _) | DataType::List(_) | DataType::LargeList(_) => {
491 let children = arrays
492 .iter()
493 .map(|array| &array.child_data()[0])
494 .collect::<Vec<_>>();
496 let capacities =
497 if let Capacities::List(capacity, ref child_capacities) = capacities {
498 child_capacities
499 .clone()
500 .map(|c| *c)
501 .unwrap_or(Capacities::Array(capacity))
502 } else {
503 Capacities::Array(array_capacity)
504 };
506 vec![MutableArrayData::with_capacities(
507 children, use_nulls, capacities,
508 )]
509 }
510 DataType::Dictionary(_, _) => vec![],
512 DataType::Struct(fields) => match capacities {
513 Capacities::Struct(capacity, Some(ref child_capacities)) => {
514 array_capacity = capacity;
515 (0..fields.len())
516 .zip(child_capacities)
517 .map(|(i, child_cap)| {
518 let child_arrays = arrays
519 .iter()
520 .map(|array| &array.child_data()[i])
521 .collect::<Vec<_>>();
522 MutableArrayData::with_capacities(
523 child_arrays,
524 use_nulls,
525 child_cap.clone(),
526 )
527 })
528 .collect::<Vec<_>>()
529 }
530 Capacities::Struct(capacity, None) => {
531 array_capacity = capacity;
532 (0..fields.len())
533 .map(|i| {
534 let child_arrays = arrays
535 .iter()
536 .map(|array| &array.child_data()[i])
537 .collect::<Vec<_>>();
538 MutableArrayData::new(child_arrays, use_nulls, capacity)
539 })
540 .collect::<Vec<_>>()
541 }
542 _ => (0..fields.len())
543 .map(|i| {
544 let child_arrays = arrays
545 .iter()
546 .map(|array| &array.child_data()[i])
547 .collect::<Vec<_>>();
548 MutableArrayData::new(child_arrays, use_nulls, array_capacity)
549 })
550 .collect::<Vec<_>>(),
551 },
552 DataType::RunEndEncoded(_, _) => {
553 let run_ends_child = arrays
554 .iter()
555 .map(|array| &array.child_data()[0])
556 .collect::<Vec<_>>();
557 let value_child = arrays
558 .iter()
559 .map(|array| &array.child_data()[1])
560 .collect::<Vec<_>>();
561 vec![
562 MutableArrayData::new(run_ends_child, false, array_capacity),
563 MutableArrayData::new(value_child, use_nulls, array_capacity),
564 ]
565 }
566 DataType::FixedSizeList(_, size) => {
567 let children = arrays
568 .iter()
569 .map(|array| &array.child_data()[0])
570 .collect::<Vec<_>>();
571 let capacities =
572 if let Capacities::List(capacity, ref child_capacities) = capacities {
573 child_capacities
574 .clone()
575 .map(|c| *c)
576 .unwrap_or(Capacities::Array(capacity * *size as usize))
577 } else {
578 Capacities::Array(array_capacity * *size as usize)
579 };
580 vec![MutableArrayData::with_capacities(
581 children, use_nulls, capacities,
582 )]
583 }
584 DataType::Union(fields, _) => (0..fields.len())
585 .map(|i| {
586 let child_arrays = arrays
587 .iter()
588 .map(|array| &array.child_data()[i])
589 .collect::<Vec<_>>();
590 MutableArrayData::new(child_arrays, use_nulls, array_capacity)
591 })
592 .collect::<Vec<_>>(),
593 };
595 let (dictionary, dict_concat) = match &data_type {
597 DataType::Dictionary(_, _) => {
598 let dict_concat = !arrays
600 .windows(2)
601 .all(|a| a[0].child_data()[0].ptr_eq(&a[1].child_data()[0]));
603 match dict_concat {
604 false => (Some(arrays[0].child_data()[0].clone()), false),
605 true => {
606 if let Capacities::Dictionary(_, _) = capacities {
607 panic!("dictionary capacity not yet supported")
608 }
609 let dictionaries: Vec<_> =
610 arrays.iter().map(|array| &array.child_data()[0]).collect();
611 let lengths: Vec<_> = dictionaries
612 .iter()
613 .map(|dictionary| dictionary.len())
614 .collect();
615 let capacity = lengths.iter().sum();
617 let mut mutable = MutableArrayData::new(dictionaries, false, capacity);
619 for (i, len) in lengths.iter().enumerate() {
620 mutable.extend(i, 0, *len)
621 }
623 (Some(mutable.freeze()), true)
624 }
625 }
626 }
627 _ => (None, false),
628 };
630 let variadic_data_buffers = match &data_type {
631 DataType::BinaryView | DataType::Utf8View => arrays
632 .iter()
633 .flat_map(|x| x.buffers().iter().skip(1))
634 .map(Buffer::clone)
635 .collect(),
636 _ => vec![],
637 };
639 let extend_nulls = build_extend_nulls(data_type);
641 let extend_null_bits = arrays
642 .iter()
643 .map(|array| build_extend_null_bits(array, use_nulls))
644 .collect();
646 let null_buffer = use_nulls.then(|| {
647 let null_bytes = bit_util::ceil(array_capacity, 8);
648 MutableBuffer::from_len_zeroed(null_bytes)
649 });
651 let extend_values = match &data_type {
652 DataType::Dictionary(_, _) => {
653 let mut next_offset = 0;
654 let extend_values: Result<Vec<_>, _> = arrays
655 .iter()
656 .map(|array| {
657 let offset = next_offset;
658 let dict_len = array.child_data()[0].len();
660 if dict_concat {
661 next_offset += dict_len;
662 }
664 build_extend_dictionary(array, offset, offset + dict_len)
665 .ok_or(ArrowError::DictionaryKeyOverflowError)
666 })
667 .collect();
669 extend_values.expect("MutableArrayData::new is infallible")
670 }
671 DataType::BinaryView | DataType::Utf8View => {
672 let mut next_offset = 0u32;
673 arrays
674 .iter()
675 .map(|arr| {
676 let num_data_buffers = (arr.buffers().len() - 1) as u32;
677 let offset = next_offset;
678 next_offset = next_offset
679 .checked_add(num_data_buffers)
680 .expect("view buffer index overflow");
681 build_extend_view(arr, offset)
682 })
683 .collect()
684 }
685 _ => arrays.iter().map(|array| build_extend(array)).collect(),
686 };
688 let data = _MutableArrayData {
689 data_type: data_type.clone(),
690 len: 0,
691 null_count: 0,
692 null_buffer,
693 buffer1,
694 buffer2,
695 child_data,
696 };
697 Self {
698 arrays,
699 data,
700 dictionary,
701 variadic_data_buffers,
702 extend_values,
703 extend_null_bits,
704 extend_nulls,
705 }
706 }
708 pub fn extend(&mut self, index: usize, start: usize, end: usize) {
720 let len = end - start;
721 (self.extend_null_bits[index])(&mut self.data, start, len);
722 (self.extend_values[index])(&mut self.data, index, start, len);
723 self.data.len += len;
724 }
726 pub fn extend_nulls(&mut self, len: usize) {
732 self.data.len += len;
733 let bit_len = bit_util::ceil(self.data.len, 8);
734 let nulls = self.data.null_buffer();
735 nulls.resize(bit_len, 0);
736 self.data.null_count += len;
737 (self.extend_nulls)(&mut self.data, len);
738 }
740 #[inline]
742 pub fn len(&self) -> usize {
743 self.data.len
744 }
746 #[inline]
748 pub fn is_empty(&self) -> bool {
749 self.data.len == 0
750 }
752 #[inline]
754 pub fn null_count(&self) -> usize {
755 self.data.null_count
756 }
758 pub fn freeze(self) -> ArrayData {
760 unsafe { self.into_builder().build_unchecked() }
761 }
763 pub fn into_builder(self) -> ArrayDataBuilder {
767 let data = self.data;
769 let buffers = match data.data_type {
770 DataType::Null | DataType::Struct(_) | DataType::FixedSizeList(_, _) => {
771 vec![]
772 }
773 DataType::BinaryView | DataType::Utf8View => {
774 let mut b = self.variadic_data_buffers;
775 b.insert(0, data.buffer1.into());
776 b
777 }
778 DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => {
779 vec![data.buffer1.into(), data.buffer2.into()]
780 }
781 DataType::Union(_, mode) => {
782 match mode {
783 UnionMode::Sparse => vec![data.buffer1.into()],
785 UnionMode::Dense => vec![data.buffer1.into(), data.buffer2.into()],
786 }
787 }
788 _ => vec![data.buffer1.into()],
789 };
791 let child_data = match data.data_type {
792 DataType::Dictionary(_, _) => vec![self.dictionary.unwrap()],
793 _ => data.child_data.into_iter().map(|x| x.freeze()).collect(),
794 };
796 let nulls = data
797 .null_buffer
798 .map(|nulls| {
799 let bools = BooleanBuffer::new(nulls.into(), 0, data.len);
800 unsafe { NullBuffer::new_unchecked(bools, data.null_count) }
801 })
802 .filter(|n| n.null_count() > 0);
804 ArrayDataBuilder::new(data.data_type)
805 .offset(0)
806 .len(data.len)
807 .nulls(nulls)
808 .buffers(buffers)
809 .child_data(child_data)
810 }
816mod test {
817 use super::*;
818 use arrow_schema::Field;
819 use std::sync::Arc;
821 #[test]
822 fn test_list_append_with_capacities() {
823 let array = ArrayData::new_empty(&DataType::List(Arc::new(Field::new(
824 "element",
825 DataType::Int64,
826 false,
827 ))));
829 let mutable = MutableArrayData::with_capacities(
830 vec![&array],
831 false,
832 Capacities::List(6, Some(Box::new(Capacities::Array(17)))),
833 );
835 assert_eq!(mutable.data.buffer1.capacity(), 64);
837 assert_eq!(mutable.data.child_data[0].data.buffer1.capacity(), 192);
838 }