1use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
21use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder, PrimitiveBuilder};
22use arrow_array::cast::AsArray;
23use arrow_array::types::*;
24use arrow_array::*;
25use arrow_buffer::{ArrowNativeType, MutableBuffer, NullBuffer, NullBufferBuilder, OffsetBuffer};
26use arrow_data::transform::MutableArrayData;
27use arrow_data::ByteView;
28use arrow_schema::{ArrowError, DataType};
29use std::collections::HashMap;
30use std::sync::Arc;
31
32macro_rules! primitive_helper {
33 ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
34 interleave_primitive::<$t>($values, $indices, $data_type)
35 };
36}
37
38macro_rules! dict_helper {
39 ($t:ty, $values:expr, $indices:expr) => {
40 Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
41 };
42}
43
44pub fn interleave(
72 values: &[&dyn Array],
73 indices: &[(usize, usize)],
74) -> Result<ArrayRef, ArrowError> {
75 if values.is_empty() {
76 return Err(ArrowError::InvalidArgumentError(
77 "interleave requires input of at least one array".to_string(),
78 ));
79 }
80 let data_type = values[0].data_type();
81
82 for array in values.iter().skip(1) {
83 if array.data_type() != data_type {
84 return Err(ArrowError::InvalidArgumentError(format!(
85 "It is not possible to interleave arrays of different data types ({} and {})",
86 data_type,
87 array.data_type()
88 )));
89 }
90 }
91
92 if indices.is_empty() {
93 return Ok(new_empty_array(data_type));
94 }
95
96 downcast_primitive! {
97 data_type => (primitive_helper, values, indices, data_type),
98 DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
99 DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
100 DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
101 DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
102 DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
103 DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
104 DataType::Dictionary(k, _) => downcast_integer! {
105 k.as_ref() => (dict_helper, values, indices),
106 _ => unreachable!("illegal dictionary key type {k}")
107 },
108 _ => interleave_fallback(values, indices)
109 }
110}
111
112struct Interleave<'a, T> {
116 arrays: Vec<&'a T>,
118 nulls: Option<NullBuffer>,
120}
121
122impl<'a, T: Array + 'static> Interleave<'a, T> {
123 fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
124 let mut has_nulls = false;
125 let arrays: Vec<&T> = values
126 .iter()
127 .map(|x| {
128 has_nulls = has_nulls || x.null_count() != 0;
129 x.as_any().downcast_ref().unwrap()
130 })
131 .collect();
132
133 let nulls = match has_nulls {
134 true => {
135 let mut builder = NullBufferBuilder::new(indices.len());
136 for (a, b) in indices {
137 let v = arrays[*a].is_valid(*b);
138 builder.append(v)
139 }
140 builder.finish()
141 }
142 false => None,
143 };
144
145 Self { arrays, nulls }
146 }
147}
148
149fn interleave_primitive<T: ArrowPrimitiveType>(
150 values: &[&dyn Array],
151 indices: &[(usize, usize)],
152 data_type: &DataType,
153) -> Result<ArrayRef, ArrowError> {
154 let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
155
156 let mut values = Vec::with_capacity(indices.len());
157 for (a, b) in indices {
158 let v = interleaved.arrays[*a].value(*b);
159 values.push(v)
160 }
161
162 let array = PrimitiveArray::<T>::new(values.into(), interleaved.nulls);
163 Ok(Arc::new(array.with_data_type(data_type.clone())))
164}
165
166fn interleave_bytes<T: ByteArrayType>(
167 values: &[&dyn Array],
168 indices: &[(usize, usize)],
169) -> Result<ArrayRef, ArrowError> {
170 let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
171
172 let mut capacity = 0;
173 let mut offsets = BufferBuilder::<T::Offset>::new(indices.len() + 1);
174 offsets.append(T::Offset::from_usize(0).unwrap());
175 for (a, b) in indices {
176 let o = interleaved.arrays[*a].value_offsets();
177 let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
178 capacity += element_len;
179 offsets.append(T::Offset::from_usize(capacity).expect("overflow"));
180 }
181
182 let mut values = MutableBuffer::new(capacity);
183 for (a, b) in indices {
184 values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
185 }
186
187 let array = unsafe {
189 let offsets = OffsetBuffer::new_unchecked(offsets.finish().into());
190 GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
191 };
192 Ok(Arc::new(array))
193}
194
195fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
196 arrays: &[&dyn Array],
197 indices: &[(usize, usize)],
198) -> Result<ArrayRef, ArrowError> {
199 let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
200 if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
201 return interleave_fallback(arrays, indices);
202 }
203
204 let masks: Vec<_> = dictionaries
205 .iter()
206 .enumerate()
207 .map(|(a_idx, dictionary)| {
208 let mut key_mask = BooleanBufferBuilder::new_from_buffer(
209 MutableBuffer::new_null(dictionary.len()),
210 dictionary.len(),
211 );
212
213 for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
214 key_mask.set_bit(*key_idx, true);
215 }
216 key_mask.finish()
217 })
218 .collect();
219
220 let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
221
222 let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
224 for (a, b) in indices {
225 let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
226 match old_keys.is_valid(*b) {
227 true => {
228 let old_key = old_keys.values()[*b];
229 keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
230 }
231 false => keys.append_null(),
232 }
233 }
234 let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
235 Ok(Arc::new(array))
236}
237
238fn interleave_views<T: ByteViewType>(
239 values: &[&dyn Array],
240 indices: &[(usize, usize)],
241) -> Result<ArrayRef, ArrowError> {
242 let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
243 let mut views_builder = BufferBuilder::new(indices.len());
244 let mut buffers = Vec::new();
245
246 let mut buffer_lookup: HashMap<(usize, u32), u32> = HashMap::new();
248 for (array_idx, value_idx) in indices {
249 let array = interleaved.arrays[*array_idx];
250 let raw_view = array.views().get(*value_idx).unwrap();
251 let view_len = *raw_view as u32;
252 if view_len <= 12 {
253 views_builder.append(*raw_view);
254 continue;
255 }
256 let view = ByteView::from(*raw_view);
258 let new_buffer_idx: &mut u32 = buffer_lookup
259 .entry((*array_idx, view.buffer_index))
260 .or_insert_with(|| {
261 buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
262 (buffers.len() - 1) as u32
263 });
264 views_builder.append(view.with_buffer_index(*new_buffer_idx).into());
265 }
266
267 let array = unsafe {
268 GenericByteViewArray::<T>::new_unchecked(views_builder.into(), buffers, interleaved.nulls)
269 };
270 Ok(Arc::new(array))
271}
272
273fn interleave_fallback(
275 values: &[&dyn Array],
276 indices: &[(usize, usize)],
277) -> Result<ArrayRef, ArrowError> {
278 let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
279 let arrays: Vec<_> = arrays.iter().collect();
280 let mut array_data = MutableArrayData::new(arrays, false, indices.len());
281
282 let mut cur_array = indices[0].0;
283 let mut start_row_idx = indices[0].1;
284 let mut end_row_idx = start_row_idx + 1;
285
286 for (array, row) in indices.iter().skip(1).copied() {
287 if array == cur_array && row == end_row_idx {
288 end_row_idx += 1;
290 continue;
291 }
292
293 array_data.extend(cur_array, start_row_idx, end_row_idx);
295
296 cur_array = array;
298 start_row_idx = row;
299 end_row_idx = start_row_idx + 1;
300 }
301
302 array_data.extend(cur_array, start_row_idx, end_row_idx);
304 Ok(make_array(array_data.freeze()))
305}
306
307pub fn interleave_record_batch(
352 record_batches: &[&RecordBatch],
353 indices: &[(usize, usize)],
354) -> Result<RecordBatch, ArrowError> {
355 let schema = record_batches[0].schema();
356 let columns = (0..schema.fields().len())
357 .map(|i| {
358 let column_values: Vec<&dyn Array> = record_batches
359 .iter()
360 .map(|batch| batch.column(i).as_ref())
361 .collect();
362 interleave(&column_values, indices)
363 })
364 .collect::<Result<Vec<_>, _>>()?;
365 RecordBatch::try_new(schema, columns)
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use arrow_array::builder::{Int32Builder, ListBuilder};
372
373 #[test]
374 fn test_primitive() {
375 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
376 let b = Int32Array::from_iter_values([5, 6, 7]);
377 let c = Int32Array::from_iter_values([8, 9, 10]);
378 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
379 let v = values.as_primitive::<Int32Type>();
380 assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
381 }
382
383 #[test]
384 fn test_primitive_nulls() {
385 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
386 let b = Int32Array::from_iter([Some(1), Some(4), None]);
387 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
388 let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
389 assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
390 }
391
392 #[test]
393 fn test_primitive_empty() {
394 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
395 let v = interleave(&[&a], &[]).unwrap();
396 assert!(v.is_empty());
397 assert_eq!(v.data_type(), &DataType::Int32);
398 }
399
400 #[test]
401 fn test_strings() {
402 let a = StringArray::from_iter_values(["a", "b", "c"]);
403 let b = StringArray::from_iter_values(["hello", "world", "foo"]);
404 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
405 let v = values.as_string::<i32>();
406 let values: Vec<_> = v.into_iter().collect();
407 assert_eq!(
408 &values,
409 &[
410 Some("c"),
411 Some("c"),
412 Some("hello"),
413 Some("world"),
414 Some("b")
415 ]
416 )
417 }
418
419 #[test]
420 fn test_interleave_dictionary() {
421 let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
422 let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
423
424 let values =
426 interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
427 let v = values.as_dictionary::<Int32Type>();
428 assert_eq!(v.values().len(), 5);
429
430 let vc = v.downcast_dict::<StringArray>().unwrap();
431 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
432 assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
433
434 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
436 let v = values.as_dictionary::<Int32Type>();
437 assert_eq!(v.values().len(), 1);
438
439 let vc = v.downcast_dict::<StringArray>().unwrap();
440 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
441 assert_eq!(&collected, &["c", "c", "c"]);
442 }
443
444 #[test]
445 fn test_lists() {
446 let mut a = ListBuilder::new(Int32Builder::new());
448 a.values().append_value(1);
449 a.values().append_value(2);
450 a.append(true);
451 a.append(false);
452 a.values().append_value(3);
453 a.append(true);
454 let a = a.finish();
455
456 let mut b = ListBuilder::new(Int32Builder::new());
458 b.values().append_value(4);
459 b.append(true);
460 b.append(false);
461 b.values().append_value(5);
462 b.values().append_value(6);
463 b.values().append_null();
464 b.append(true);
465 let b = b.finish();
466
467 let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
468 let v = values.as_any().downcast_ref::<ListArray>().unwrap();
469
470 let mut expected = ListBuilder::new(Int32Builder::new());
472 expected.values().append_value(3);
473 expected.append(true);
474 expected.append(false);
475 expected.values().append_value(4);
476 expected.append(true);
477 expected.values().append_value(5);
478 expected.values().append_value(6);
479 expected.values().append_null();
480 expected.append(true);
481 expected.append(false);
482 let expected = expected.finish();
483
484 assert_eq!(v, &expected);
485 }
486
487 #[test]
488 fn interleave_sparse_nulls() {
489 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
490 let keys = Int32Array::from_iter_values(0..10);
491 let dict_a = DictionaryArray::new(keys, Arc::new(values));
492 let values = StringArray::new_null(0);
493 let keys = Int32Array::new_null(10);
494 let dict_b = DictionaryArray::new(keys, Arc::new(values));
495
496 let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
497 let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
498
499 let expected =
500 DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
501 assert_eq!(array.as_ref(), &expected)
502 }
503
504 #[test]
505 fn test_interleave_views() {
506 let values = StringArray::from_iter_values([
507 "hello",
508 "world_long_string_not_inlined",
509 "foo",
510 "bar",
511 "baz",
512 ]);
513 let view_a = StringViewArray::from(&values);
514
515 let values = StringArray::from_iter_values([
516 "test",
517 "data",
518 "more_long_string_not_inlined",
519 "views",
520 "here",
521 ]);
522 let view_b = StringViewArray::from(&values);
523
524 let indices = &[
525 (0, 2), (1, 0), (0, 4), (1, 3), (0, 1), ];
531
532 let values = interleave(&[&view_a, &view_b], indices).unwrap();
534 let result = values.as_string_view();
535 assert_eq!(result.data_buffers().len(), 1);
536
537 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
538 let fallback_result = fallback.as_string_view();
539 assert_eq!(fallback_result.data_buffers().len(), 2);
541
542 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
544
545 let fallback_collected: Vec<_> = fallback_result
546 .iter()
547 .map(|x| x.map(|s| s.to_string()))
548 .collect();
549
550 assert_eq!(&collected, &fallback_collected);
551
552 assert_eq!(
553 &collected,
554 &[
555 Some("foo".to_string()),
556 Some("test".to_string()),
557 Some("baz".to_string()),
558 Some("views".to_string()),
559 Some("world_long_string_not_inlined".to_string()),
560 ]
561 );
562 }
563
564 #[test]
565 fn test_interleave_views_with_nulls() {
566 let values = StringArray::from_iter([
567 Some("hello"),
568 None,
569 Some("foo_long_string_not_inlined"),
570 Some("bar"),
571 None,
572 ]);
573 let view_a = StringViewArray::from(&values);
574
575 let values = StringArray::from_iter([
576 Some("test"),
577 Some("data_long_string_not_inlined"),
578 None,
579 None,
580 Some("here"),
581 ]);
582 let view_b = StringViewArray::from(&values);
583
584 let indices = &[
585 (0, 1), (1, 2), (0, 2), (1, 3), (0, 4), ];
591
592 let values = interleave(&[&view_a, &view_b], indices).unwrap();
594 let result = values.as_string_view();
595 assert_eq!(result.data_buffers().len(), 1);
596
597 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
598 let fallback_result = fallback.as_string_view();
599
600 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
602
603 let fallback_collected: Vec<_> = fallback_result
604 .iter()
605 .map(|x| x.map(|s| s.to_string()))
606 .collect();
607
608 assert_eq!(&collected, &fallback_collected);
609
610 assert_eq!(
611 &collected,
612 &[
613 None,
614 None,
615 Some("foo_long_string_not_inlined".to_string()),
616 None,
617 None,
618 ]
619 );
620 }
621
622 #[test]
623 fn test_interleave_views_multiple_buffers() {
624 let str1 = "very_long_string_from_first_buffer".as_bytes();
625 let str2 = "very_long_string_from_second_buffer".as_bytes();
626 let buffer1 = str1.to_vec().into();
627 let buffer2 = str2.to_vec().into();
628
629 let view1 = ByteView::new(str1.len() as u32, &str1[..4])
630 .with_buffer_index(0)
631 .with_offset(0)
632 .as_u128();
633 let view2 = ByteView::new(str2.len() as u32, &str2[..4])
634 .with_buffer_index(1)
635 .with_offset(0)
636 .as_u128();
637 let view_a =
638 StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
639 .unwrap();
640
641 let str3 = "another_very_long_string_buffer_three".as_bytes();
642 let str4 = "different_long_string_in_buffer_four".as_bytes();
643 let buffer3 = str3.to_vec().into();
644 let buffer4 = str4.to_vec().into();
645
646 let view3 = ByteView::new(str3.len() as u32, &str3[..4])
647 .with_buffer_index(0)
648 .with_offset(0)
649 .as_u128();
650 let view4 = ByteView::new(str4.len() as u32, &str4[..4])
651 .with_buffer_index(1)
652 .with_offset(0)
653 .as_u128();
654 let view_b =
655 StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
656 .unwrap();
657
658 let indices = &[
659 (0, 0), (1, 0), (0, 1), (1, 1), (0, 0), (1, 1), ];
666
667 let values = interleave(&[&view_a, &view_b], indices).unwrap();
669 let result = values.as_string_view();
670
671 assert_eq!(
672 result.data_buffers().len(),
673 4,
674 "Expected four buffers (two from each input array)"
675 );
676
677 let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
678 assert_eq!(
679 result_strings,
680 vec![
681 Some("very_long_string_from_first_buffer".to_string()),
682 Some("another_very_long_string_buffer_three".to_string()),
683 Some("very_long_string_from_second_buffer".to_string()),
684 Some("different_long_string_in_buffer_four".to_string()),
685 Some("very_long_string_from_first_buffer".to_string()),
686 Some("different_long_string_in_buffer_four".to_string()),
687 ]
688 );
689
690 let views = result.views();
691 let buffer_indices: Vec<_> = views
692 .iter()
693 .map(|raw_view| ByteView::from(*raw_view).buffer_index)
694 .collect();
695
696 assert_eq!(
697 buffer_indices,
698 vec![
699 0, 1, 2, 3, 0, 3, ]
706 );
707 }
708}