1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
23use arrow_data::ByteView;
24use arrow_schema::ArrowError;
25use hashbrown::hash_table::Entry;
26use hashbrown::HashTable;
27
28use crate::builder::ArrayBuilder;
29use crate::types::bytes::ByteArrayNativeType;
30use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31use crate::{ArrayRef, GenericByteViewArray};
32
33const STARTING_BLOCK_SIZE: u32 = 8 * 1024; const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; enum BlockSizeGrowthStrategy {
37 Fixed { size: u32 },
38 Exponential { current_size: u32 },
39}
40
41impl BlockSizeGrowthStrategy {
42 fn next_size(&mut self) -> u32 {
43 match self {
44 Self::Fixed { size } => *size,
45 Self::Exponential { current_size } => {
46 if *current_size < MAX_BLOCK_SIZE {
47 *current_size = current_size.saturating_mul(2);
49 *current_size
50 } else {
51 MAX_BLOCK_SIZE
52 }
53 }
54 }
55 }
56}
57
58pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82 views_builder: BufferBuilder<u128>,
83 null_buffer_builder: NullBufferBuilder,
84 completed: Vec<Buffer>,
85 in_progress: Vec<u8>,
86 block_size: BlockSizeGrowthStrategy,
87 string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
90 phantom: PhantomData<T>,
91}
92
93impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
94 pub fn new() -> Self {
96 Self::with_capacity(1024)
97 }
98
99 pub fn with_capacity(capacity: usize) -> Self {
101 Self {
102 views_builder: BufferBuilder::new(capacity),
103 null_buffer_builder: NullBufferBuilder::new(capacity),
104 completed: vec![],
105 in_progress: vec![],
106 block_size: BlockSizeGrowthStrategy::Exponential {
107 current_size: STARTING_BLOCK_SIZE,
108 },
109 string_tracker: None,
110 phantom: Default::default(),
111 }
112 }
113
114 pub fn with_fixed_block_size(self, block_size: u32) -> Self {
130 debug_assert!(block_size > 0, "Block size must be greater than 0");
131 Self {
132 block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
133 ..self
134 }
135 }
136
137 #[deprecated(since = "53.0.0", note = "Use `with_fixed_block_size` instead")]
140 pub fn with_block_size(self, block_size: u32) -> Self {
141 self.with_fixed_block_size(block_size)
142 }
143
144 pub fn with_deduplicate_strings(self) -> Self {
149 Self {
150 string_tracker: Some((
151 HashTable::with_capacity(self.views_builder.capacity()),
152 Default::default(),
153 )),
154 ..self
155 }
156 }
157
158 pub fn append_block(&mut self, buffer: Buffer) -> u32 {
183 assert!(buffer.len() < u32::MAX as usize);
184
185 self.flush_in_progress();
186 let offset = self.completed.len();
187 self.push_completed(buffer);
188 offset as u32
189 }
190
191 pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
198 let b = self.completed.get_unchecked(block as usize);
199 let start = offset as usize;
200 let end = start.saturating_add(len as usize);
201 let b = b.get_unchecked(start..end);
202
203 let view = make_view(b, block, offset);
204 self.views_builder.append(view);
205 self.null_buffer_builder.append_non_null();
206 }
207
208 pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
212 let b = self.completed.get(block as usize).ok_or_else(|| {
213 ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
214 })?;
215 let start = offset as usize;
216 let end = start.saturating_add(len as usize);
217
218 let b = b.get(start..end).ok_or_else(|| {
219 ArrowError::InvalidArgumentError(format!(
220 "Range {start}..{end} out of bounds for block of length {}",
221 b.len()
222 ))
223 })?;
224
225 if T::Native::from_bytes_checked(b).is_none() {
226 return Err(ArrowError::InvalidArgumentError(
227 "Invalid view data".to_string(),
228 ));
229 }
230
231 unsafe {
232 self.append_view_unchecked(block, offset, len);
233 }
234 Ok(())
235 }
236
237 #[inline]
239 fn flush_in_progress(&mut self) {
240 if !self.in_progress.is_empty() {
241 let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
242 self.push_completed(f)
243 }
244 }
245
246 #[inline]
248 fn push_completed(&mut self, block: Buffer) {
249 assert!(block.len() < u32::MAX as usize, "Block too large");
250 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
251 self.completed.push(block);
252 }
253
254 pub fn get_value(&self, index: usize) -> &[u8] {
258 let view = self.views_builder.as_slice().get(index).unwrap();
259 let len = *view as u32;
260 if len <= 12 {
261 unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
264 } else {
265 let view = ByteView::from(*view);
266 if view.buffer_index < self.completed.len() as u32 {
267 let block = &self.completed[view.buffer_index as usize];
268 &block[view.offset as usize..view.offset as usize + view.length as usize]
269 } else {
270 &self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
271 }
272 }
273 }
274
275 #[inline]
283 pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
284 let v: &[u8] = value.as_ref().as_ref();
285 let length: u32 = v.len().try_into().unwrap();
286 if length <= 12 {
287 let mut view_buffer = [0; 16];
288 view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
289 view_buffer[4..4 + v.len()].copy_from_slice(v);
290 self.views_builder.append(u128::from_le_bytes(view_buffer));
291 self.null_buffer_builder.append_non_null();
292 return;
293 }
294
295 if let Some((mut ht, hasher)) = self.string_tracker.take() {
299 let hash_val = hasher.hash_one(v);
300 let hasher_fn = |v: &_| hasher.hash_one(v);
301
302 let entry = ht.entry(
303 hash_val,
304 |idx| {
305 let stored_value = self.get_value(*idx);
306 v == stored_value
307 },
308 hasher_fn,
309 );
310 match entry {
311 Entry::Occupied(occupied) => {
312 let idx = occupied.get();
314 self.views_builder
315 .append(self.views_builder.as_slice()[*idx]);
316 self.null_buffer_builder.append_non_null();
317 self.string_tracker = Some((ht, hasher));
318 return;
319 }
320 Entry::Vacant(vacant) => {
321 vacant.insert(self.views_builder.len());
324 }
325 }
326 self.string_tracker = Some((ht, hasher));
327 }
328
329 let required_cap = self.in_progress.len() + v.len();
330 if self.in_progress.capacity() < required_cap {
331 self.flush_in_progress();
332 let to_reserve = v.len().max(self.block_size.next_size() as usize);
333 self.in_progress.reserve(to_reserve);
334 };
335 let offset = self.in_progress.len() as u32;
336 self.in_progress.extend_from_slice(v);
337
338 let view = ByteView {
339 length,
340 prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
341 buffer_index: self.completed.len() as u32,
342 offset,
343 };
344 self.views_builder.append(view.into());
345 self.null_buffer_builder.append_non_null();
346 }
347
348 #[inline]
350 pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
351 match value {
352 None => self.append_null(),
353 Some(v) => self.append_value(v),
354 };
355 }
356
357 #[inline]
359 pub fn append_null(&mut self) {
360 self.null_buffer_builder.append_null();
361 self.views_builder.append(0);
362 }
363
364 pub fn finish(&mut self) -> GenericByteViewArray<T> {
366 self.flush_in_progress();
367 let completed = std::mem::take(&mut self.completed);
368 let len = self.views_builder.len();
369 let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
370 let nulls = self.null_buffer_builder.finish();
371 if let Some((ref mut ht, _)) = self.string_tracker.as_mut() {
372 ht.clear();
373 }
374 unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
376 }
377
378 pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
380 let mut completed = self.completed.clone();
381 if !self.in_progress.is_empty() {
382 completed.push(Buffer::from_slice_ref(&self.in_progress));
383 }
384 let len = self.views_builder.len();
385 let views = Buffer::from_slice_ref(self.views_builder.as_slice());
386 let views = ScalarBuffer::new(views, 0, len);
387 let nulls = self.null_buffer_builder.finish_cloned();
388 unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
390 }
391
392 pub fn validity_slice(&self) -> Option<&[u8]> {
394 self.null_buffer_builder.as_slice()
395 }
396
397 pub fn allocated_size(&self) -> usize {
399 let views = self.views_builder.capacity() * std::mem::size_of::<u128>();
400 let null = self.null_buffer_builder.allocated_size();
401 let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
402 let in_progress = self.in_progress.capacity();
403 let tracker = match &self.string_tracker {
404 Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
405 None => 0,
406 };
407 buffer_size + in_progress + tracker + views + null
408 }
409}
410
411impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
412 fn default() -> Self {
413 Self::new()
414 }
415}
416
417impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
418 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419 write!(f, "{}ViewBuilder", T::PREFIX)?;
420 f.debug_struct("")
421 .field("views_builder", &self.views_builder)
422 .field("in_progress", &self.in_progress)
423 .field("completed", &self.completed)
424 .field("null_buffer_builder", &self.null_buffer_builder)
425 .finish()
426 }
427}
428
429impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
430 fn len(&self) -> usize {
431 self.null_buffer_builder.len()
432 }
433
434 fn finish(&mut self) -> ArrayRef {
435 Arc::new(self.finish())
436 }
437
438 fn finish_cloned(&self) -> ArrayRef {
439 Arc::new(self.finish_cloned())
440 }
441
442 fn as_any(&self) -> &dyn Any {
443 self
444 }
445
446 fn as_any_mut(&mut self) -> &mut dyn Any {
447 self
448 }
449
450 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
451 self
452 }
453}
454
455impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
456 for GenericByteViewBuilder<T>
457{
458 #[inline]
459 fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
460 for v in iter {
461 self.append_option(v)
462 }
463 }
464}
465
466pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
486
487pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
508
509fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
512 let mut view_buffer = [0; 16];
513 view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
514 view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
515 u128::from_le_bytes(view_buffer)
516}
517
518#[inline(never)]
524pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
525 let len = data.len();
526
527 match len {
530 0 => make_inlined_view::<0>(data),
531 1 => make_inlined_view::<1>(data),
532 2 => make_inlined_view::<2>(data),
533 3 => make_inlined_view::<3>(data),
534 4 => make_inlined_view::<4>(data),
535 5 => make_inlined_view::<5>(data),
536 6 => make_inlined_view::<6>(data),
537 7 => make_inlined_view::<7>(data),
538 8 => make_inlined_view::<8>(data),
539 9 => make_inlined_view::<9>(data),
540 10 => make_inlined_view::<10>(data),
541 11 => make_inlined_view::<11>(data),
542 12 => make_inlined_view::<12>(data),
543 _ => {
545 let view = ByteView {
546 length: len as u32,
547 prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
548 buffer_index: block_id,
549 offset,
550 };
551 view.as_u128()
552 }
553 }
554}
555
556#[cfg(test)]
557mod tests {
558 use core::str;
559
560 use super::*;
561 use crate::Array;
562
563 #[test]
564 fn test_string_view_deduplicate() {
565 let value_1 = "long string to test string view";
566 let value_2 = "not so similar string but long";
567
568 let mut builder = StringViewBuilder::new()
569 .with_deduplicate_strings()
570 .with_fixed_block_size(value_1.len() as u32 * 2); let values = vec![
573 Some(value_1),
574 Some(value_2),
575 Some("short"),
576 Some(value_1),
577 None,
578 Some(value_2),
579 Some(value_1),
580 ];
581 builder.extend(values.clone());
582
583 let array = builder.finish_cloned();
584 array.to_data().validate_full().unwrap();
585 assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
587 assert_eq!(actual, values);
588
589 let view0 = array.views().first().unwrap();
590 let view3 = array.views().get(3).unwrap();
591 let view6 = array.views().get(6).unwrap();
592
593 assert_eq!(view0, view3);
594 assert_eq!(view0, view6);
595
596 assert_eq!(array.views().get(1), array.views().get(5));
597 }
598
599 #[test]
600 fn test_string_view_deduplicate_after_finish() {
601 let mut builder = StringViewBuilder::new().with_deduplicate_strings();
602
603 let value_1 = "long string to test string view";
604 let value_2 = "not so similar string but long";
605 builder.append_value(value_1);
606 let _array = builder.finish();
607 builder.append_value(value_2);
608 let _array = builder.finish();
609 builder.append_value(value_1);
610 let _array = builder.finish();
611 }
612
613 #[test]
614 fn test_string_view() {
615 let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
616 let b2 = Buffer::from(b"cupcakes");
617 let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
618
619 let mut v = StringViewBuilder::new();
620 assert_eq!(v.append_block(b1), 0);
621
622 v.append_value("This is a very long string that exceeds the inline length");
623 v.append_value("This is another very long string that exceeds the inline length");
624
625 assert_eq!(v.append_block(b2), 2);
626 assert_eq!(v.append_block(b3), 3);
627
628 v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
643
644 let array = v.finish_cloned();
645 array.to_data().validate_full().unwrap();
646 assert_eq!(array.data_buffers().len(), 5);
647 let actual: Vec<_> = array.iter().flatten().collect();
648 assert_eq!(
649 actual,
650 &[
651 "This is a very long string that exceeds the inline length",
652 "This is another very long string that exceeds the inline length",
653 "world",
654 "bananas",
655 "cakes",
656 "cup",
657 "cupcakes",
658 "😁",
659 "",
660 "Many strings are",
661 "This is a very long",
662 "are here contained of great",
663 "I do so like long strings"
664 ]
665 );
666
667 let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
668 assert_eq!(err.to_string(), "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17");
669
670 let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
671 assert_eq!(
672 err.to_string(),
673 "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
674 );
675
676 let err = v.try_append_view(0, 13, 2).unwrap_err();
677 assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
678
679 let err = v.try_append_view(0, 40, 0).unwrap_err();
680 assert_eq!(
681 err.to_string(),
682 "Invalid argument error: Range 40..40 out of bounds for block of length 17"
683 );
684
685 let err = v.try_append_view(5, 0, 0).unwrap_err();
686 assert_eq!(
687 err.to_string(),
688 "Invalid argument error: No block found with index 5"
689 );
690 }
691
692 #[test]
693 fn test_string_view_with_block_size_growth() {
694 let mut exp_builder = StringViewBuilder::new();
695 let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);
696
697 let long_string = str::from_utf8(&[b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();
698
699 for i in 0..9 {
700 for _ in 0..(2_u32.pow(i)) {
702 exp_builder.append_value(long_string);
703 fixed_builder.append_value(long_string);
704 }
705 exp_builder.flush_in_progress();
706 fixed_builder.flush_in_progress();
707
708 assert_eq!(exp_builder.completed.len(), i as usize + 1);
710 assert_eq!(
711 exp_builder.completed[i as usize].len(),
712 STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
713 );
714
715 assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);
717
718 assert!(fixed_builder
720 .completed
721 .iter()
722 .all(|b| b.len() == STARTING_BLOCK_SIZE as usize));
723 }
724
725 exp_builder.append_value(long_string);
727 exp_builder.flush_in_progress();
728 assert_eq!(
729 exp_builder.completed.last().unwrap().capacity(),
730 MAX_BLOCK_SIZE as usize
731 );
732 }
733}