1use crate::types::bytes::ByteArrayNativeType;
19use std::{any::Any, sync::Arc};
20
21use crate::{
22 types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, RunEndIndexType, Utf8Type},
23 ArrayRef, ArrowPrimitiveType, RunArray,
24};
25
26use super::{ArrayBuilder, GenericByteBuilder, PrimitiveBuilder};
27
28use arrow_buffer::ArrowNativeType;
29
30#[derive(Debug)]
65pub struct GenericByteRunBuilder<R, V>
66where
67 R: ArrowPrimitiveType,
68 V: ByteArrayType,
69{
70 run_ends_builder: PrimitiveBuilder<R>,
71 values_builder: GenericByteBuilder<V>,
72 current_value: Vec<u8>,
73 has_current_value: bool,
74 current_run_end_index: usize,
75 prev_run_end_index: usize,
76}
77
78impl<R, V> Default for GenericByteRunBuilder<R, V>
79where
80 R: ArrowPrimitiveType,
81 V: ByteArrayType,
82{
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl<R, V> GenericByteRunBuilder<R, V>
89where
90 R: ArrowPrimitiveType,
91 V: ByteArrayType,
92{
93 pub fn new() -> Self {
95 Self {
96 run_ends_builder: PrimitiveBuilder::new(),
97 values_builder: GenericByteBuilder::<V>::new(),
98 current_value: Vec::new(),
99 has_current_value: false,
100 current_run_end_index: 0,
101 prev_run_end_index: 0,
102 }
103 }
104
105 pub fn with_capacity(capacity: usize, data_capacity: usize) -> Self {
110 Self {
111 run_ends_builder: PrimitiveBuilder::with_capacity(capacity),
112 values_builder: GenericByteBuilder::<V>::with_capacity(capacity, data_capacity),
113 current_value: Vec::new(),
114 has_current_value: false,
115 current_run_end_index: 0,
116 prev_run_end_index: 0,
117 }
118 }
119}
120
121impl<R, V> ArrayBuilder for GenericByteRunBuilder<R, V>
122where
123 R: RunEndIndexType,
124 V: ByteArrayType,
125{
126 fn as_any(&self) -> &dyn Any {
128 self
129 }
130
131 fn as_any_mut(&mut self) -> &mut dyn Any {
133 self
134 }
135
136 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
138 self
139 }
140
141 fn len(&self) -> usize {
144 self.current_run_end_index
145 }
146
147 fn finish(&mut self) -> ArrayRef {
149 Arc::new(self.finish())
150 }
151
152 fn finish_cloned(&self) -> ArrayRef {
154 Arc::new(self.finish_cloned())
155 }
156}
157
158impl<R, V> GenericByteRunBuilder<R, V>
159where
160 R: RunEndIndexType,
161 V: ByteArrayType,
162{
163 pub fn append_option(&mut self, input_value: Option<impl AsRef<V::Native>>) {
165 match input_value {
166 Some(value) => self.append_value(value),
167 None => self.append_null(),
168 }
169 }
170
171 pub fn append_value(&mut self, input_value: impl AsRef<V::Native>) {
173 let value: &[u8] = input_value.as_ref().as_ref();
174 if !self.has_current_value {
175 self.append_run_end();
176 self.current_value.extend_from_slice(value);
177 self.has_current_value = true;
178 } else if self.current_value.as_slice() != value {
179 self.append_run_end();
180 self.current_value.clear();
181 self.current_value.extend_from_slice(value);
182 }
183 self.current_run_end_index += 1;
184 }
185
186 pub fn append_null(&mut self) {
188 if self.has_current_value {
189 self.append_run_end();
190 self.current_value.clear();
191 self.has_current_value = false;
192 }
193 self.current_run_end_index += 1;
194 }
195
196 pub fn finish(&mut self) -> RunArray<R> {
199 self.append_run_end();
201
202 self.current_value.clear();
204 self.has_current_value = false;
205 self.current_run_end_index = 0;
206 self.prev_run_end_index = 0;
207
208 let run_ends_array = self.run_ends_builder.finish();
210 let values_array = self.values_builder.finish();
211 RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
212 }
213
214 pub fn finish_cloned(&self) -> RunArray<R> {
217 let mut run_ends_array = self.run_ends_builder.finish_cloned();
218 let mut values_array = self.values_builder.finish_cloned();
219
220 if self.prev_run_end_index != self.current_run_end_index {
222 let mut run_end_builder = run_ends_array.into_builder().unwrap();
223 let mut values_builder = values_array.into_builder().unwrap();
224 self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder);
225 run_ends_array = run_end_builder.finish();
226 values_array = values_builder.finish();
227 }
228
229 RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
230 }
231
232 fn append_run_end(&mut self) {
234 if self.prev_run_end_index == self.current_run_end_index {
236 return;
237 }
238 let run_end_index = self.run_end_index_as_native();
239 self.run_ends_builder.append_value(run_end_index);
240 if self.has_current_value {
241 let slice = self.current_value.as_slice();
242 let native = unsafe {
243 V::Native::from_bytes_unchecked(slice)
247 };
248 self.values_builder.append_value(native);
249 } else {
250 self.values_builder.append_null();
251 }
252 self.prev_run_end_index = self.current_run_end_index;
253 }
254
255 fn append_run_end_with_builders(
258 &self,
259 run_ends_builder: &mut PrimitiveBuilder<R>,
260 values_builder: &mut GenericByteBuilder<V>,
261 ) {
262 let run_end_index = self.run_end_index_as_native();
263 run_ends_builder.append_value(run_end_index);
264 if self.has_current_value {
265 let slice = self.current_value.as_slice();
266 let native = unsafe {
267 V::Native::from_bytes_unchecked(slice)
271 };
272 values_builder.append_value(native);
273 } else {
274 values_builder.append_null();
275 }
276 }
277
278 fn run_end_index_as_native(&self) -> R::Native {
279 R::Native::from_usize(self.current_run_end_index).unwrap_or_else(|| {
280 panic!(
281 "Cannot convert the value {} from `usize` to native form of arrow datatype {}",
282 self.current_run_end_index,
283 R::DATA_TYPE
284 )
285 })
286 }
287}
288
289impl<R, V, S> Extend<Option<S>> for GenericByteRunBuilder<R, V>
290where
291 R: RunEndIndexType,
292 V: ByteArrayType,
293 S: AsRef<V::Native>,
294{
295 fn extend<T: IntoIterator<Item = Option<S>>>(&mut self, iter: T) {
296 for elem in iter {
297 self.append_option(elem);
298 }
299 }
300}
301
302pub type StringRunBuilder<K> = GenericByteRunBuilder<K, Utf8Type>;
334
335pub type LargeStringRunBuilder<K> = GenericByteRunBuilder<K, LargeUtf8Type>;
337
338pub type BinaryRunBuilder<K> = GenericByteRunBuilder<K, BinaryType>;
370
371pub type LargeBinaryRunBuilder<K> = GenericByteRunBuilder<K, LargeBinaryType>;
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 use crate::array::Array;
379 use crate::cast::AsArray;
380 use crate::types::{Int16Type, Int32Type};
381 use crate::GenericByteArray;
382 use crate::Int16RunArray;
383
384 fn test_bytes_run_builder<T>(values: Vec<&T::Native>)
385 where
386 T: ByteArrayType,
387 <T as ByteArrayType>::Native: PartialEq,
388 <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
389 {
390 let mut builder = GenericByteRunBuilder::<Int16Type, T>::new();
391 builder.append_value(values[0]);
392 builder.append_value(values[0]);
393 builder.append_value(values[0]);
394 builder.append_null();
395 builder.append_null();
396 builder.append_value(values[1]);
397 builder.append_value(values[1]);
398 builder.append_value(values[2]);
399 builder.append_value(values[2]);
400 builder.append_value(values[2]);
401 builder.append_value(values[2]);
402 let array = builder.finish();
403
404 assert_eq!(array.len(), 11);
405 assert_eq!(array.null_count(), 0);
406 assert_eq!(array.logical_null_count(), 2);
407
408 assert_eq!(array.run_ends().values(), &[3, 5, 7, 11]);
409
410 let av = array.values();
412 let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
413
414 assert_eq!(*ava.value(0), *values[0]);
415 assert!(ava.is_null(1));
416 assert_eq!(*ava.value(2), *values[1]);
417 assert_eq!(*ava.value(3), *values[2]);
418 }
419
420 #[test]
421 fn test_string_run_builder() {
422 test_bytes_run_builder::<Utf8Type>(vec!["abc", "def", "ghi"]);
423 }
424
425 #[test]
426 fn test_string_run_builder_with_empty_strings() {
427 test_bytes_run_builder::<Utf8Type>(vec!["abc", "", "ghi"]);
428 }
429
430 #[test]
431 fn test_binary_run_builder() {
432 test_bytes_run_builder::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
433 }
434
435 fn test_bytes_run_builder_finish_cloned<T>(values: Vec<&T::Native>)
436 where
437 T: ByteArrayType,
438 <T as ByteArrayType>::Native: PartialEq,
439 <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
440 {
441 let mut builder = GenericByteRunBuilder::<Int16Type, T>::new();
442
443 builder.append_value(values[0]);
444 builder.append_null();
445 builder.append_value(values[1]);
446 builder.append_value(values[1]);
447 builder.append_value(values[0]);
448 let mut array: Int16RunArray = builder.finish_cloned();
449
450 assert_eq!(array.len(), 5);
451 assert_eq!(array.null_count(), 0);
452 assert_eq!(array.logical_null_count(), 1);
453
454 assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]);
455
456 let av = array.values();
458 let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
459
460 assert_eq!(ava.value(0), values[0]);
461 assert!(ava.is_null(1));
462 assert_eq!(ava.value(2), values[1]);
463 assert_eq!(ava.value(3), values[0]);
464
465 builder.append_value(values[0]);
468 builder.append_value(values[0]);
469 builder.append_value(values[1]);
470 array = builder.finish();
471
472 assert_eq!(array.len(), 8);
473 assert_eq!(array.null_count(), 0);
474 assert_eq!(array.logical_null_count(), 1);
475
476 assert_eq!(array.run_ends().values(), &[1, 2, 4, 7, 8]);
477
478 let av2 = array.values();
480 let ava2: &GenericByteArray<T> =
481 av2.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
482
483 assert_eq!(ava2.value(0), values[0]);
484 assert!(ava2.is_null(1));
485 assert_eq!(ava2.value(2), values[1]);
486 assert_eq!(ava2.value(3), values[0]);
488 assert_eq!(ava2.value(4), values[1]);
489 }
490
491 #[test]
492 fn test_string_run_builder_finish_cloned() {
493 test_bytes_run_builder_finish_cloned::<Utf8Type>(vec!["abc", "def", "ghi"]);
494 }
495
496 #[test]
497 fn test_binary_run_builder_finish_cloned() {
498 test_bytes_run_builder_finish_cloned::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
499 }
500
501 #[test]
502 fn test_extend() {
503 let mut builder = StringRunBuilder::<Int32Type>::new();
504 builder.extend(["a", "a", "a", "", "", "b", "b"].into_iter().map(Some));
505 builder.extend(["b", "cupcakes", "cupcakes"].into_iter().map(Some));
506 let array = builder.finish();
507
508 assert_eq!(array.len(), 10);
509 assert_eq!(array.run_ends().values(), &[3, 5, 8, 10]);
510
511 let str_array = array.values().as_string::<i32>();
512 assert_eq!(str_array.value(0), "a");
513 assert_eq!(str_array.value(1), "");
514 assert_eq!(str_array.value(2), "b");
515 assert_eq!(str_array.value(3), "cupcakes");
516 }
517}