1use crate::builder::{ArrayBuilder, GenericByteBuilder, PrimitiveBuilder};
19use crate::types::{ArrowDictionaryKeyType, ByteArrayType, GenericBinaryType, GenericStringType};
20use crate::{Array, ArrayRef, DictionaryArray, GenericByteArray, TypedDictionaryArray};
21use arrow_buffer::ArrowNativeType;
22use arrow_schema::{ArrowError, DataType};
23use hashbrown::HashTable;
24use std::any::Any;
25use std::sync::Arc;
26
27#[derive(Debug)]
33pub struct GenericByteDictionaryBuilder<K, T>
34where
35 K: ArrowDictionaryKeyType,
36 T: ByteArrayType,
37{
38 state: ahash::RandomState,
39 dedup: HashTable<usize>,
40
41 keys_builder: PrimitiveBuilder<K>,
42 values_builder: GenericByteBuilder<T>,
43}
44
45impl<K, T> Default for GenericByteDictionaryBuilder<K, T>
46where
47 K: ArrowDictionaryKeyType,
48 T: ByteArrayType,
49{
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl<K, T> GenericByteDictionaryBuilder<K, T>
56where
57 K: ArrowDictionaryKeyType,
58 T: ByteArrayType,
59{
60 pub fn new() -> Self {
62 let keys_builder = PrimitiveBuilder::new();
63 let values_builder = GenericByteBuilder::<T>::new();
64 Self {
65 state: Default::default(),
66 dedup: HashTable::with_capacity(keys_builder.capacity()),
67 keys_builder,
68 values_builder,
69 }
70 }
71
72 pub fn with_capacity(
78 keys_capacity: usize,
79 value_capacity: usize,
80 data_capacity: usize,
81 ) -> Self {
82 Self {
83 state: Default::default(),
84 dedup: Default::default(),
85 keys_builder: PrimitiveBuilder::with_capacity(keys_capacity),
86 values_builder: GenericByteBuilder::<T>::with_capacity(value_capacity, data_capacity),
87 }
88 }
89
90 pub fn new_with_dictionary(
114 keys_capacity: usize,
115 dictionary_values: &GenericByteArray<T>,
116 ) -> Result<Self, ArrowError> {
117 let state = ahash::RandomState::default();
118 let dict_len = dictionary_values.len();
119
120 let mut dedup = HashTable::with_capacity(dict_len);
121
122 let values_len = dictionary_values.value_data().len();
123 let mut values_builder = GenericByteBuilder::<T>::with_capacity(dict_len, values_len);
124
125 K::Native::from_usize(dictionary_values.len())
126 .ok_or(ArrowError::DictionaryKeyOverflowError)?;
127
128 for (idx, maybe_value) in dictionary_values.iter().enumerate() {
129 match maybe_value {
130 Some(value) => {
131 let value_bytes: &[u8] = value.as_ref();
132 let hash = state.hash_one(value_bytes);
133
134 dedup
135 .entry(
136 hash,
137 |idx: &usize| value_bytes == get_bytes(&values_builder, *idx),
138 |idx: &usize| state.hash_one(get_bytes(&values_builder, *idx)),
139 )
140 .or_insert(idx);
141
142 values_builder.append_value(value);
143 }
144 None => values_builder.append_null(),
145 }
146 }
147
148 Ok(Self {
149 state,
150 dedup,
151 keys_builder: PrimitiveBuilder::with_capacity(keys_capacity),
152 values_builder,
153 })
154 }
155}
156
157impl<K, T> ArrayBuilder for GenericByteDictionaryBuilder<K, T>
158where
159 K: ArrowDictionaryKeyType,
160 T: ByteArrayType,
161{
162 fn as_any(&self) -> &dyn Any {
164 self
165 }
166
167 fn as_any_mut(&mut self) -> &mut dyn Any {
169 self
170 }
171
172 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
174 self
175 }
176
177 fn len(&self) -> usize {
179 self.keys_builder.len()
180 }
181
182 fn finish(&mut self) -> ArrayRef {
184 Arc::new(self.finish())
185 }
186
187 fn finish_cloned(&self) -> ArrayRef {
189 Arc::new(self.finish_cloned())
190 }
191}
192
193impl<K, T> GenericByteDictionaryBuilder<K, T>
194where
195 K: ArrowDictionaryKeyType,
196 T: ByteArrayType,
197{
198 fn get_or_insert_key(&mut self, value: impl AsRef<T::Native>) -> Result<K::Native, ArrowError> {
199 let value_native: &T::Native = value.as_ref();
200 let value_bytes: &[u8] = value_native.as_ref();
201
202 let state = &self.state;
203 let storage = &mut self.values_builder;
204 let hash = state.hash_one(value_bytes);
205
206 let idx = *self
207 .dedup
208 .entry(
209 hash,
210 |idx| value_bytes == get_bytes(storage, *idx),
211 |idx| state.hash_one(get_bytes(storage, *idx)),
212 )
213 .or_insert_with(|| {
214 let idx = storage.len();
215 storage.append_value(value);
216 idx
217 })
218 .get();
219
220 let key = K::Native::from_usize(idx).ok_or(ArrowError::DictionaryKeyOverflowError)?;
221
222 Ok(key)
223 }
224
225 pub fn append(&mut self, value: impl AsRef<T::Native>) -> Result<K::Native, ArrowError> {
231 let key = self.get_or_insert_key(value)?;
232 self.keys_builder.append_value(key);
233 Ok(key)
234 }
235
236 pub fn append_n(
241 &mut self,
242 value: impl AsRef<T::Native>,
243 count: usize,
244 ) -> Result<K::Native, ArrowError> {
245 let key = self.get_or_insert_key(value)?;
246 self.keys_builder.append_value_n(key, count);
247 Ok(key)
248 }
249
250 pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
256 self.append(value).expect("dictionary key overflow");
257 }
258
259 pub fn append_values(&mut self, value: impl AsRef<T::Native>, count: usize) {
266 self.append_n(value, count)
267 .expect("dictionary key overflow");
268 }
269
270 #[inline]
272 pub fn append_null(&mut self) {
273 self.keys_builder.append_null()
274 }
275
276 #[inline]
278 pub fn append_nulls(&mut self, n: usize) {
279 self.keys_builder.append_nulls(n)
280 }
281
282 #[inline]
288 pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
289 match value {
290 None => self.append_null(),
291 Some(v) => self.append_value(v),
292 };
293 }
294
295 pub fn append_options(&mut self, value: Option<impl AsRef<T::Native>>, count: usize) {
302 match value {
303 None => self.keys_builder.append_nulls(count),
304 Some(v) => self.append_values(v, count),
305 };
306 }
307
308 pub fn extend_dictionary(
316 &mut self,
317 dictionary: &TypedDictionaryArray<K, GenericByteArray<T>>,
318 ) -> Result<(), ArrowError> {
319 let values = dictionary.values();
320
321 let v_len = values.len();
322 let k_len = dictionary.keys().len();
323 if v_len == 0 && k_len == 0 {
324 return Ok(());
325 }
326
327 if v_len == 0 {
329 self.append_nulls(k_len);
330 return Ok(());
331 }
332
333 if k_len == 0 {
334 return Err(ArrowError::InvalidArgumentError(
335 "Dictionary keys should not be empty when values are not empty".to_string(),
336 ));
337 }
338
339 let mapped_values = values
341 .iter()
342 .map(|dict_value| {
344 dict_value
345 .map(|dict_value| self.get_or_insert_key(dict_value))
346 .transpose()
347 })
348 .collect::<Result<Vec<_>, _>>()?;
349
350 dictionary.keys().iter().for_each(|key| match key {
352 None => self.append_null(),
353 Some(original_dict_index) => {
354 let index = original_dict_index.as_usize().min(v_len - 1);
355 match mapped_values[index] {
356 None => self.append_null(),
357 Some(mapped_value) => self.keys_builder.append_value(mapped_value),
358 }
359 }
360 });
361
362 Ok(())
363 }
364
365 pub fn finish(&mut self) -> DictionaryArray<K> {
367 self.dedup.clear();
368 let values = self.values_builder.finish();
369 let keys = self.keys_builder.finish();
370
371 let data_type = DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(T::DATA_TYPE));
372
373 let builder = keys
374 .into_data()
375 .into_builder()
376 .data_type(data_type)
377 .child_data(vec![values.into_data()]);
378
379 DictionaryArray::from(unsafe { builder.build_unchecked() })
380 }
381
382 pub fn finish_cloned(&self) -> DictionaryArray<K> {
384 let values = self.values_builder.finish_cloned();
385 let keys = self.keys_builder.finish_cloned();
386
387 let data_type = DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(T::DATA_TYPE));
388
389 let builder = keys
390 .into_data()
391 .into_builder()
392 .data_type(data_type)
393 .child_data(vec![values.into_data()]);
394
395 DictionaryArray::from(unsafe { builder.build_unchecked() })
396 }
397
398 pub fn validity_slice(&self) -> Option<&[u8]> {
400 self.keys_builder.validity_slice()
401 }
402}
403
404impl<K: ArrowDictionaryKeyType, T: ByteArrayType, V: AsRef<T::Native>> Extend<Option<V>>
405 for GenericByteDictionaryBuilder<K, T>
406{
407 #[inline]
408 fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
409 for v in iter {
410 self.append_option(v)
411 }
412 }
413}
414
415fn get_bytes<T: ByteArrayType>(values: &GenericByteBuilder<T>, idx: usize) -> &[u8] {
416 let offsets = values.offsets_slice();
417 let values = values.values_slice();
418
419 let end_offset = offsets[idx + 1].as_usize();
420 let start_offset = offsets[idx].as_usize();
421
422 &values[start_offset..end_offset]
423}
424
425pub type StringDictionaryBuilder<K> = GenericByteDictionaryBuilder<K, GenericStringType<i32>>;
458
459pub type LargeStringDictionaryBuilder<K> = GenericByteDictionaryBuilder<K, GenericStringType<i64>>;
461
462pub type BinaryDictionaryBuilder<K> = GenericByteDictionaryBuilder<K, GenericBinaryType<i32>>;
496
497pub type LargeBinaryDictionaryBuilder<K> = GenericByteDictionaryBuilder<K, GenericBinaryType<i64>>;
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503
504 use crate::array::Int8Array;
505 use crate::cast::AsArray;
506 use crate::types::{Int16Type, Int32Type, Int8Type, Utf8Type};
507 use crate::{ArrowPrimitiveType, BinaryArray, StringArray};
508
509 fn test_bytes_dictionary_builder<T>(values: Vec<&T::Native>)
510 where
511 T: ByteArrayType,
512 <T as ByteArrayType>::Native: PartialEq,
513 <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
514 {
515 let mut builder = GenericByteDictionaryBuilder::<Int8Type, T>::new();
516 builder.append(values[0]).unwrap();
517 builder.append_null();
518 builder.append(values[1]).unwrap();
519 builder.append(values[1]).unwrap();
520 builder.append(values[0]).unwrap();
521 let array = builder.finish();
522
523 assert_eq!(
524 array.keys(),
525 &Int8Array::from(vec![Some(0), None, Some(1), Some(1), Some(0)])
526 );
527
528 let av = array.values();
530 let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
531
532 assert_eq!(*ava.value(0), *values[0]);
533 assert_eq!(*ava.value(1), *values[1]);
534 }
535
536 #[test]
537 fn test_string_dictionary_builder() {
538 test_bytes_dictionary_builder::<GenericStringType<i32>>(vec!["abc", "def"]);
539 }
540
541 #[test]
542 fn test_binary_dictionary_builder() {
543 test_bytes_dictionary_builder::<GenericBinaryType<i32>>(vec![b"abc", b"def"]);
544 }
545
546 fn test_bytes_dictionary_builder_finish_cloned<T>(values: Vec<&T::Native>)
547 where
548 T: ByteArrayType,
549 <T as ByteArrayType>::Native: PartialEq,
550 <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
551 {
552 let mut builder = GenericByteDictionaryBuilder::<Int8Type, T>::new();
553
554 builder.append(values[0]).unwrap();
555 builder.append_null();
556 builder.append(values[1]).unwrap();
557 builder.append(values[1]).unwrap();
558 builder.append(values[0]).unwrap();
559 let mut array = builder.finish_cloned();
560
561 assert_eq!(
562 array.keys(),
563 &Int8Array::from(vec![Some(0), None, Some(1), Some(1), Some(0)])
564 );
565
566 let av = array.values();
568 let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
569
570 assert_eq!(ava.value(0), values[0]);
571 assert_eq!(ava.value(1), values[1]);
572
573 builder.append(values[0]).unwrap();
574 builder.append(values[2]).unwrap();
575 builder.append(values[1]).unwrap();
576
577 array = builder.finish();
578
579 assert_eq!(
580 array.keys(),
581 &Int8Array::from(vec![
582 Some(0),
583 None,
584 Some(1),
585 Some(1),
586 Some(0),
587 Some(0),
588 Some(2),
589 Some(1)
590 ])
591 );
592
593 let av2 = array.values();
595 let ava2: &GenericByteArray<T> =
596 av2.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
597
598 assert_eq!(ava2.value(0), values[0]);
599 assert_eq!(ava2.value(1), values[1]);
600 assert_eq!(ava2.value(2), values[2]);
601 }
602
603 #[test]
604 fn test_string_dictionary_builder_finish_cloned() {
605 test_bytes_dictionary_builder_finish_cloned::<GenericStringType<i32>>(vec![
606 "abc", "def", "ghi",
607 ]);
608 }
609
610 #[test]
611 fn test_binary_dictionary_builder_finish_cloned() {
612 test_bytes_dictionary_builder_finish_cloned::<GenericBinaryType<i32>>(vec![
613 b"abc", b"def", b"ghi",
614 ]);
615 }
616
617 fn test_bytes_dictionary_builder_with_existing_dictionary<T>(
618 dictionary: GenericByteArray<T>,
619 values: Vec<&T::Native>,
620 ) where
621 T: ByteArrayType,
622 <T as ByteArrayType>::Native: PartialEq,
623 <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
624 {
625 let mut builder =
626 GenericByteDictionaryBuilder::<Int8Type, T>::new_with_dictionary(6, &dictionary)
627 .unwrap();
628 builder.append(values[0]).unwrap();
629 builder.append_null();
630 builder.append(values[1]).unwrap();
631 builder.append(values[1]).unwrap();
632 builder.append(values[0]).unwrap();
633 builder.append(values[2]).unwrap();
634 let array = builder.finish();
635
636 assert_eq!(
637 array.keys(),
638 &Int8Array::from(vec![Some(2), None, Some(1), Some(1), Some(2), Some(3)])
639 );
640
641 let av = array.values();
643 let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
644
645 assert!(!ava.is_valid(0));
646 assert_eq!(ava.value(1), values[1]);
647 assert_eq!(ava.value(2), values[0]);
648 assert_eq!(ava.value(3), values[2]);
649 }
650
651 #[test]
652 fn test_string_dictionary_builder_with_existing_dictionary() {
653 test_bytes_dictionary_builder_with_existing_dictionary::<GenericStringType<i32>>(
654 StringArray::from(vec![None, Some("def"), Some("abc")]),
655 vec!["abc", "def", "ghi"],
656 );
657 }
658
659 #[test]
660 fn test_binary_dictionary_builder_with_existing_dictionary() {
661 let values: Vec<Option<&[u8]>> = vec![None, Some(b"def"), Some(b"abc")];
662 test_bytes_dictionary_builder_with_existing_dictionary::<GenericBinaryType<i32>>(
663 BinaryArray::from(values),
664 vec![b"abc", b"def", b"ghi"],
665 );
666 }
667
668 fn test_bytes_dictionary_builder_with_reserved_null_value<T>(
669 dictionary: GenericByteArray<T>,
670 values: Vec<&T::Native>,
671 ) where
672 T: ByteArrayType,
673 <T as ByteArrayType>::Native: PartialEq,
674 <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
675 {
676 let mut builder =
677 GenericByteDictionaryBuilder::<Int16Type, T>::new_with_dictionary(4, &dictionary)
678 .unwrap();
679 builder.append(values[0]).unwrap();
680 builder.append_null();
681 builder.append(values[1]).unwrap();
682 builder.append(values[0]).unwrap();
683 let array = builder.finish();
684
685 assert!(array.is_null(1));
686 assert!(!array.is_valid(1));
687
688 let keys = array.keys();
689
690 assert_eq!(keys.value(0), 1);
691 assert!(keys.is_null(1));
692 assert_eq!(keys.value(1), 0);
694 assert_eq!(keys.value(2), 2);
695 assert_eq!(keys.value(3), 1);
696 }
697
698 #[test]
699 fn test_string_dictionary_builder_with_reserved_null_value() {
700 let v: Vec<Option<&str>> = vec![None];
701 test_bytes_dictionary_builder_with_reserved_null_value::<GenericStringType<i32>>(
702 StringArray::from(v),
703 vec!["abc", "def"],
704 );
705 }
706
707 #[test]
708 fn test_binary_dictionary_builder_with_reserved_null_value() {
709 let values: Vec<Option<&[u8]>> = vec![None];
710 test_bytes_dictionary_builder_with_reserved_null_value::<GenericBinaryType<i32>>(
711 BinaryArray::from(values),
712 vec![b"abc", b"def"],
713 );
714 }
715
716 #[test]
717 fn test_extend() {
718 let mut builder = GenericByteDictionaryBuilder::<Int32Type, Utf8Type>::new();
719 builder.extend(["a", "b", "c", "a", "b", "c"].into_iter().map(Some));
720 builder.extend(["c", "d", "a"].into_iter().map(Some));
721 let dict = builder.finish();
722 assert_eq!(dict.keys().values(), &[0, 1, 2, 0, 1, 2, 2, 3, 0]);
723 assert_eq!(dict.values().len(), 4);
724 }
725
726 #[test]
727 fn test_extend_dictionary() {
728 let some_dict = {
729 let mut builder = GenericByteDictionaryBuilder::<Int32Type, Utf8Type>::new();
730 builder.extend(["a", "b", "c", "a", "b", "c"].into_iter().map(Some));
731 builder.extend([None::<&str>]);
732 builder.extend(["c", "d", "a"].into_iter().map(Some));
733 builder.append_null();
734 builder.finish()
735 };
736
737 let mut builder = GenericByteDictionaryBuilder::<Int32Type, Utf8Type>::new();
738 builder.extend(["e", "e", "f", "e", "d"].into_iter().map(Some));
739 builder
740 .extend_dictionary(&some_dict.downcast_dict().unwrap())
741 .unwrap();
742 let dict = builder.finish();
743
744 assert_eq!(dict.values().len(), 6);
745
746 let values = dict
747 .downcast_dict::<GenericByteArray<Utf8Type>>()
748 .unwrap()
749 .into_iter()
750 .collect::<Vec<_>>();
751
752 assert_eq!(
753 values,
754 [
755 Some("e"),
756 Some("e"),
757 Some("f"),
758 Some("e"),
759 Some("d"),
760 Some("a"),
761 Some("b"),
762 Some("c"),
763 Some("a"),
764 Some("b"),
765 Some("c"),
766 None,
767 Some("c"),
768 Some("d"),
769 Some("a"),
770 None
771 ]
772 );
773 }
774 #[test]
775 fn test_extend_dictionary_with_null_in_mapped_value() {
776 let some_dict = {
777 let mut values_builder = GenericByteBuilder::<Utf8Type>::new();
778 let mut keys_builder = PrimitiveBuilder::<Int32Type>::new();
779
780 values_builder.append_null();
782 keys_builder.append_value(0);
783 values_builder.append_value("I like worm hugs");
784 keys_builder.append_value(1);
785
786 let values = values_builder.finish();
787 let keys = keys_builder.finish();
788
789 let data_type = DataType::Dictionary(
790 Box::new(Int32Type::DATA_TYPE),
791 Box::new(Utf8Type::DATA_TYPE),
792 );
793
794 let builder = keys
795 .into_data()
796 .into_builder()
797 .data_type(data_type)
798 .child_data(vec![values.into_data()]);
799
800 DictionaryArray::from(unsafe { builder.build_unchecked() })
801 };
802
803 let some_dict_values = some_dict.values().as_string::<i32>();
804 assert_eq!(
805 some_dict_values.into_iter().collect::<Vec<_>>(),
806 &[None, Some("I like worm hugs")]
807 );
808
809 let mut builder = GenericByteDictionaryBuilder::<Int32Type, Utf8Type>::new();
810 builder
811 .extend_dictionary(&some_dict.downcast_dict().unwrap())
812 .unwrap();
813 let dict = builder.finish();
814
815 assert_eq!(dict.values().len(), 1);
816
817 let values = dict
818 .downcast_dict::<GenericByteArray<Utf8Type>>()
819 .unwrap()
820 .into_iter()
821 .collect::<Vec<_>>();
822
823 assert_eq!(values, [None, Some("I like worm hugs")]);
824 }
825
826 #[test]
827 fn test_extend_all_null_dictionary() {
828 let some_dict = {
829 let mut builder = GenericByteDictionaryBuilder::<Int32Type, Utf8Type>::new();
830 builder.append_nulls(2);
831 builder.finish()
832 };
833
834 let mut builder = GenericByteDictionaryBuilder::<Int32Type, Utf8Type>::new();
835 builder
836 .extend_dictionary(&some_dict.downcast_dict().unwrap())
837 .unwrap();
838 let dict = builder.finish();
839
840 assert_eq!(dict.values().len(), 0);
841
842 let values = dict
843 .downcast_dict::<GenericByteArray<Utf8Type>>()
844 .unwrap()
845 .into_iter()
846 .collect::<Vec<_>>();
847
848 assert_eq!(values, [None, None]);
849 }
850}