1use std::cmp::{self, Ordering};
21use std::task::{ready, Poll};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
25use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
26use crate::{
27 DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
28 SendableRecordBatchStream,
29};
30
31use arrow::array::{
32 new_null_array, Array, ArrayRef, AsArray, FixedSizeListArray, Int64Array,
33 LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
34};
35use arrow::compute::kernels::length::length;
36use arrow::compute::kernels::zip::zip;
37use arrow::compute::{cast, is_not_null, kernels, sum};
38use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
39use arrow::record_batch::RecordBatch;
40use arrow_ord::cmp::lt;
41use datafusion_common::{
42 exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
43};
44use datafusion_execution::TaskContext;
45use datafusion_physical_expr::EquivalenceProperties;
46
47use async_trait::async_trait;
48use futures::{Stream, StreamExt};
49use log::trace;
50
51#[derive(Debug, Clone)]
58pub struct UnnestExec {
59 input: Arc<dyn ExecutionPlan>,
61 schema: SchemaRef,
63 list_column_indices: Vec<ListUnnest>,
65 struct_column_indices: Vec<usize>,
67 options: UnnestOptions,
69 metrics: ExecutionPlanMetricsSet,
71 cache: PlanProperties,
73}
74
75impl UnnestExec {
76 pub fn new(
78 input: Arc<dyn ExecutionPlan>,
79 list_column_indices: Vec<ListUnnest>,
80 struct_column_indices: Vec<usize>,
81 schema: SchemaRef,
82 options: UnnestOptions,
83 ) -> Self {
84 let cache = Self::compute_properties(&input, Arc::clone(&schema));
85
86 UnnestExec {
87 input,
88 schema,
89 list_column_indices,
90 struct_column_indices,
91 options,
92 metrics: Default::default(),
93 cache,
94 }
95 }
96
97 fn compute_properties(
99 input: &Arc<dyn ExecutionPlan>,
100 schema: SchemaRef,
101 ) -> PlanProperties {
102 PlanProperties::new(
103 EquivalenceProperties::new(schema),
104 input.output_partitioning().to_owned(),
105 input.pipeline_behavior(),
106 input.boundedness(),
107 )
108 }
109
110 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
112 &self.input
113 }
114
115 pub fn list_column_indices(&self) -> &[ListUnnest] {
117 &self.list_column_indices
118 }
119
120 pub fn struct_column_indices(&self) -> &[usize] {
122 &self.struct_column_indices
123 }
124
125 pub fn options(&self) -> &UnnestOptions {
126 &self.options
127 }
128}
129
130impl DisplayAs for UnnestExec {
131 fn fmt_as(
132 &self,
133 t: DisplayFormatType,
134 f: &mut std::fmt::Formatter,
135 ) -> std::fmt::Result {
136 match t {
137 DisplayFormatType::Default | DisplayFormatType::Verbose => {
138 write!(f, "UnnestExec")
139 }
140 }
141 }
142}
143
144impl ExecutionPlan for UnnestExec {
145 fn name(&self) -> &'static str {
146 "UnnestExec"
147 }
148
149 fn as_any(&self) -> &dyn Any {
150 self
151 }
152
153 fn properties(&self) -> &PlanProperties {
154 &self.cache
155 }
156
157 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
158 vec![&self.input]
159 }
160
161 fn with_new_children(
162 self: Arc<Self>,
163 children: Vec<Arc<dyn ExecutionPlan>>,
164 ) -> Result<Arc<dyn ExecutionPlan>> {
165 Ok(Arc::new(UnnestExec::new(
166 Arc::clone(&children[0]),
167 self.list_column_indices.clone(),
168 self.struct_column_indices.clone(),
169 Arc::clone(&self.schema),
170 self.options.clone(),
171 )))
172 }
173
174 fn required_input_distribution(&self) -> Vec<Distribution> {
175 vec![Distribution::UnspecifiedDistribution]
176 }
177
178 fn execute(
179 &self,
180 partition: usize,
181 context: Arc<TaskContext>,
182 ) -> Result<SendableRecordBatchStream> {
183 let input = self.input.execute(partition, context)?;
184 let metrics = UnnestMetrics::new(partition, &self.metrics);
185
186 Ok(Box::pin(UnnestStream {
187 input,
188 schema: Arc::clone(&self.schema),
189 list_type_columns: self.list_column_indices.clone(),
190 struct_column_indices: self.struct_column_indices.iter().copied().collect(),
191 options: self.options.clone(),
192 metrics,
193 }))
194 }
195
196 fn metrics(&self) -> Option<MetricsSet> {
197 Some(self.metrics.clone_inner())
198 }
199}
200
201#[derive(Clone, Debug)]
202struct UnnestMetrics {
203 elapsed_compute: metrics::Time,
205 input_batches: metrics::Count,
207 input_rows: metrics::Count,
209 output_batches: metrics::Count,
211 output_rows: metrics::Count,
213}
214
215impl UnnestMetrics {
216 fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
217 let elapsed_compute = MetricBuilder::new(metrics).elapsed_compute(partition);
218
219 let input_batches =
220 MetricBuilder::new(metrics).counter("input_batches", partition);
221
222 let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
223
224 let output_batches =
225 MetricBuilder::new(metrics).counter("output_batches", partition);
226
227 let output_rows = MetricBuilder::new(metrics).output_rows(partition);
228
229 Self {
230 input_batches,
231 input_rows,
232 output_batches,
233 output_rows,
234 elapsed_compute,
235 }
236 }
237}
238
239struct UnnestStream {
241 input: SendableRecordBatchStream,
243 schema: Arc<Schema>,
245 list_type_columns: Vec<ListUnnest>,
249 struct_column_indices: HashSet<usize>,
250 options: UnnestOptions,
252 metrics: UnnestMetrics,
254}
255
256impl RecordBatchStream for UnnestStream {
257 fn schema(&self) -> SchemaRef {
258 Arc::clone(&self.schema)
259 }
260}
261
262#[async_trait]
263impl Stream for UnnestStream {
264 type Item = Result<RecordBatch>;
265
266 fn poll_next(
267 mut self: std::pin::Pin<&mut Self>,
268 cx: &mut std::task::Context<'_>,
269 ) -> Poll<Option<Self::Item>> {
270 self.poll_next_impl(cx)
271 }
272}
273
274impl UnnestStream {
275 fn poll_next_impl(
278 &mut self,
279 cx: &mut std::task::Context<'_>,
280 ) -> Poll<Option<Result<RecordBatch>>> {
281 loop {
282 return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
283 Some(Ok(batch)) => {
284 let timer = self.metrics.elapsed_compute.timer();
285 self.metrics.input_batches.add(1);
286 self.metrics.input_rows.add(batch.num_rows());
287 let result = build_batch(
288 &batch,
289 &self.schema,
290 &self.list_type_columns,
291 &self.struct_column_indices,
292 &self.options,
293 )?;
294 timer.done();
295 let Some(result_batch) = result else {
296 continue;
297 };
298 self.metrics.output_batches.add(1);
299 self.metrics.output_rows.add(result_batch.num_rows());
300
301 debug_assert!(result_batch.num_rows() > 0);
304 Some(Ok(result_batch))
305 }
306 other => {
307 trace!(
308 "Processed {} probe-side input batches containing {} rows and \
309 produced {} output batches containing {} rows in {}",
310 self.metrics.input_batches,
311 self.metrics.input_rows,
312 self.metrics.output_batches,
313 self.metrics.output_rows,
314 self.metrics.elapsed_compute,
315 );
316 other
317 }
318 });
319 }
320 }
321}
322
323fn flatten_struct_cols(
334 input_batch: &[Arc<dyn Array>],
335 schema: &SchemaRef,
336 struct_column_indices: &HashSet<usize>,
337) -> Result<RecordBatch> {
338 let columns_expanded = input_batch
340 .iter()
341 .enumerate()
342 .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
343 Some(_) => match column_data.data_type() {
344 DataType::Struct(_) => {
345 let struct_arr =
346 column_data.as_any().downcast_ref::<StructArray>().unwrap();
347 Ok(struct_arr.columns().to_vec())
348 }
349 data_type => internal_err!(
350 "expecting column {} from input plan to be a struct, got {:?}",
351 idx,
352 data_type
353 ),
354 },
355 None => Ok(vec![Arc::clone(column_data)]),
356 })
357 .collect::<Result<Vec<_>>>()?
358 .into_iter()
359 .flatten()
360 .collect();
361 Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
362}
363
364#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
365pub struct ListUnnest {
366 pub index_in_input_schema: usize,
367 pub depth: usize,
368}
369
370fn list_unnest_at_level(
409 batch: &[ArrayRef],
410 list_type_unnests: &[ListUnnest],
411 temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
412 level_to_unnest: usize,
413 options: &UnnestOptions,
414) -> Result<Option<Vec<ArrayRef>>> {
415 let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
417 list_type_unnests
418 .iter()
419 .filter_map(|unnesting| {
420 if level_to_unnest == unnesting.depth {
421 return Some((
422 Arc::clone(&batch[unnesting.index_in_input_schema]),
423 *unnesting,
424 ));
425 }
426 if level_to_unnest < unnesting.depth {
429 return Some((
430 Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
431 *unnesting,
432 ));
433 }
434 None
435 })
436 .unzip();
437
438 let longest_length = find_longest_length(&arrs_to_unnest, options)?;
441 let unnested_length = longest_length.as_primitive::<Int64Type>();
442 let total_length = if unnested_length.is_empty() {
443 0
444 } else {
445 sum(unnested_length).ok_or_else(|| {
446 exec_datafusion_err!("Failed to calculate the total unnested length")
447 })? as usize
448 };
449 if total_length == 0 {
450 return Ok(None);
451 }
452
453 let unnested_temp_arrays =
455 unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
456
457 let take_indices = create_take_indices(unnested_length, total_length);
459 unnested_temp_arrays
460 .into_iter()
461 .zip(list_unnest_specs.iter())
462 .for_each(|(flatten_arr, unnesting)| {
463 temp_unnested_arrs.insert(*unnesting, flatten_arr);
464 });
465
466 let repeat_mask: Vec<bool> = batch
467 .iter()
468 .enumerate()
469 .map(|(i, _)| {
470 let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
472 unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
473 });
474
475 let is_involved_in_unnesting = list_type_unnests
477 .iter()
478 .any(|unnesting| unnesting.index_in_input_schema == i);
479
480 needed_in_future_levels || !is_involved_in_unnesting
482 })
483 .collect();
484
485 let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
488
489 Ok(Some(ret))
490}
491struct UnnestingResult {
492 arr: ArrayRef,
493 depth: usize,
494}
495
496fn build_batch(
553 batch: &RecordBatch,
554 schema: &SchemaRef,
555 list_type_columns: &[ListUnnest],
556 struct_column_indices: &HashSet<usize>,
557 options: &UnnestOptions,
558) -> Result<Option<RecordBatch>> {
559 let transformed = match list_type_columns.len() {
560 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
561 _ => {
562 let mut temp_unnested_result = HashMap::new();
563 let max_recursion = list_type_columns
564 .iter()
565 .fold(0, |highest_depth, ListUnnest { depth, .. }| {
566 cmp::max(highest_depth, *depth)
567 });
568
569 let mut flatten_arrs = vec![];
571
572 for depth in (1..=max_recursion).rev() {
575 let input = match depth == max_recursion {
576 true => batch.columns(),
577 false => &flatten_arrs,
578 };
579 let Some(temp_result) = list_unnest_at_level(
580 input,
581 list_type_columns,
582 &mut temp_unnested_result,
583 depth,
584 options,
585 )?
586 else {
587 return Ok(None);
588 };
589 flatten_arrs = temp_result;
590 }
591 let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
592 temp_unnested_result.into_iter().fold(
593 HashMap::new(),
594 |mut acc,
595 (
596 ListUnnest {
597 index_in_input_schema,
598 depth,
599 },
600 flattened_array,
601 )| {
602 acc.entry(index_in_input_schema).or_default().push(
603 UnnestingResult {
604 arr: flattened_array,
605 depth,
606 },
607 );
608 acc
609 },
610 );
611 let output_order: HashMap<ListUnnest, usize> = list_type_columns
612 .iter()
613 .enumerate()
614 .map(|(order, unnest_def)| (*unnest_def, order))
615 .collect();
616
617 let mut multi_unnested_per_original_index = unnested_array_map
619 .into_iter()
620 .map(
621 |(original_index, mut unnested_columns)| {
625 unnested_columns.sort_by(
626 |UnnestingResult { depth: depth1, .. },
627 UnnestingResult { depth: depth2, .. }|
628 -> Ordering {
629 output_order
630 .get(&ListUnnest {
631 depth: *depth1,
632 index_in_input_schema: original_index,
633 })
634 .unwrap()
635 .cmp(
636 output_order
637 .get(&ListUnnest {
638 depth: *depth2,
639 index_in_input_schema: original_index,
640 })
641 .unwrap(),
642 )
643 },
644 );
645 (
646 original_index,
647 unnested_columns
648 .into_iter()
649 .map(|result| result.arr)
650 .collect::<Vec<_>>(),
651 )
652 },
653 )
654 .collect::<HashMap<_, _>>();
655
656 let ret = flatten_arrs
657 .into_iter()
658 .enumerate()
659 .flat_map(|(col_idx, arr)| {
660 match multi_unnested_per_original_index.remove(&col_idx) {
664 Some(unnested_arrays) => unnested_arrays,
665 None => vec![arr],
666 }
667 })
668 .collect::<Vec<_>>();
669
670 flatten_struct_cols(&ret, schema, struct_column_indices)
671 }
672 }?;
673 Ok(Some(transformed))
674}
675
676fn find_longest_length(
699 list_arrays: &[ArrayRef],
700 options: &UnnestOptions,
701) -> Result<ArrayRef> {
702 let null_length = if options.preserve_nulls {
704 Scalar::new(Int64Array::from_value(1, 1))
705 } else {
706 Scalar::new(Int64Array::from_value(0, 1))
707 };
708 let list_lengths: Vec<ArrayRef> = list_arrays
709 .iter()
710 .map(|list_array| {
711 let mut length_array = length(list_array)?;
712 length_array = cast(&length_array, &DataType::Int64)?;
714 length_array =
715 zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
716 Ok(length_array)
717 })
718 .collect::<Result<_>>()?;
719
720 let longest_length = list_lengths.iter().skip(1).try_fold(
721 Arc::clone(&list_lengths[0]),
722 |longest, current| {
723 let is_lt = lt(&longest, ¤t)?;
724 zip(&is_lt, ¤t, &longest)
725 },
726 )?;
727 Ok(longest_length)
728}
729
730trait ListArrayType: Array {
732 fn values(&self) -> &ArrayRef;
734
735 fn value_offsets(&self, row: usize) -> (i64, i64);
737}
738
739impl ListArrayType for ListArray {
740 fn values(&self) -> &ArrayRef {
741 self.values()
742 }
743
744 fn value_offsets(&self, row: usize) -> (i64, i64) {
745 let offsets = self.value_offsets();
746 (offsets[row].into(), offsets[row + 1].into())
747 }
748}
749
750impl ListArrayType for LargeListArray {
751 fn values(&self) -> &ArrayRef {
752 self.values()
753 }
754
755 fn value_offsets(&self, row: usize) -> (i64, i64) {
756 let offsets = self.value_offsets();
757 (offsets[row], offsets[row + 1])
758 }
759}
760
761impl ListArrayType for FixedSizeListArray {
762 fn values(&self) -> &ArrayRef {
763 self.values()
764 }
765
766 fn value_offsets(&self, row: usize) -> (i64, i64) {
767 let start = self.value_offset(row) as i64;
768 (start, start + self.value_length() as i64)
769 }
770}
771
772fn unnest_list_arrays(
774 list_arrays: &[ArrayRef],
775 length_array: &PrimitiveArray<Int64Type>,
776 capacity: usize,
777) -> Result<Vec<ArrayRef>> {
778 let typed_arrays = list_arrays
779 .iter()
780 .map(|list_array| match list_array.data_type() {
781 DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
782 DataType::LargeList(_) => {
783 Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
784 }
785 DataType::FixedSizeList(_, _) => {
786 Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
787 }
788 other => exec_err!("Invalid unnest datatype {other }"),
789 })
790 .collect::<Result<Vec<_>>>()?;
791
792 typed_arrays
793 .iter()
794 .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
795 .collect::<Result<_>>()
796}
797
798fn unnest_list_array(
820 list_array: &dyn ListArrayType,
821 length_array: &PrimitiveArray<Int64Type>,
822 capacity: usize,
823) -> Result<ArrayRef> {
824 let values = list_array.values();
825 let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
826 for row in 0..list_array.len() {
827 let mut value_length = 0;
828 if !list_array.is_null(row) {
829 let (start, end) = list_array.value_offsets(row);
830 value_length = end - start;
831 for i in start..end {
832 take_indices_builder.append_value(i)
833 }
834 }
835 let target_length = length_array.value(row);
836 debug_assert!(
837 value_length <= target_length,
838 "value length is beyond the longest length"
839 );
840 for _ in value_length..target_length {
842 take_indices_builder.append_null();
843 }
844 }
845 Ok(kernels::take::take(
846 &values,
847 &take_indices_builder.finish(),
848 None,
849 )?)
850}
851
852fn create_take_indices(
868 length_array: &PrimitiveArray<Int64Type>,
869 capacity: usize,
870) -> PrimitiveArray<Int64Type> {
871 debug_assert!(
873 length_array.null_count() == 0,
874 "length array should not contain nulls"
875 );
876 let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
877 for (index, repeat) in length_array.iter().enumerate() {
878 let repeat = repeat.unwrap();
880 (0..repeat).for_each(|_| builder.append_value(index as i64));
881 }
882 builder.finish()
883}
884
885fn repeat_arrs_from_indices(
933 batch: &[ArrayRef],
934 indices: &PrimitiveArray<Int64Type>,
935 repeat_mask: &[bool],
936) -> Result<Vec<Arc<dyn Array>>> {
937 batch
938 .iter()
939 .zip(repeat_mask.iter())
940 .map(|(arr, &repeat)| {
941 if repeat {
942 Ok(kernels::take::take(arr, indices, None)?)
943 } else {
944 Ok(new_null_array(arr.data_type(), arr.len()))
945 }
946 })
947 .collect()
948}
949
950#[cfg(test)]
951mod tests {
952 use super::*;
953 use arrow::array::{
954 GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
955 };
956 use arrow::buffer::{NullBuffer, OffsetBuffer};
957 use arrow::datatypes::{Field, Int32Type};
958 use datafusion_common::assert_batches_eq;
959
960 fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
963 where
964 OffsetSize: OffsetSizeTrait,
965 {
966 let mut values = vec![];
967 let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
968 let mut valid = NullBufferBuilder::new(6);
969
970 values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
972 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
973 valid.append_non_null();
974
975 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
977 valid.append_non_null();
978
979 values.push(Some("?"));
982 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
983 valid.append_null();
984
985 values.push(Some("D"));
987 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
988 valid.append_non_null();
989
990 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
992 valid.append_null();
993
994 values.extend_from_slice(&[None, Some("F")]);
996 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
997 valid.append_non_null();
998
999 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1000 GenericListArray::<OffsetSize>::new(
1001 field,
1002 OffsetBuffer::new(offsets.into()),
1003 Arc::new(StringArray::from(values)),
1004 valid.finish(),
1005 )
1006 }
1007
1008 fn make_fixed_list() -> FixedSizeListArray {
1011 let values = Arc::new(StringArray::from_iter([
1012 Some("A"),
1013 Some("B"),
1014 None,
1015 None,
1016 Some("C"),
1017 Some("D"),
1018 None,
1019 None,
1020 None,
1021 Some("F"),
1022 None,
1023 None,
1024 ]));
1025 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1026 let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1027 FixedSizeListArray::new(field, 2, values, Some(valid))
1028 }
1029
1030 fn verify_unnest_list_array(
1031 list_array: &dyn ListArrayType,
1032 lengths: Vec<i64>,
1033 expected: Vec<Option<&str>>,
1034 ) -> Result<()> {
1035 let length_array = Int64Array::from(lengths);
1036 let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1037 let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1038 assert_eq!(strs, expected);
1039 Ok(())
1040 }
1041
1042 #[test]
1043 fn test_build_batch_list_arr_recursive() -> Result<()> {
1044 let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1049 Some(vec![Some(1), Some(2), Some(3)]),
1050 None,
1051 Some(vec![Some(4), Some(5)]),
1052 Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1053 None,
1054 Some(vec![Some(11), Some(12), Some(13)]),
1055 ]);
1056
1057 let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1058 let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1059 let mut nulls = NullBufferBuilder::new(3);
1060 nulls.append_non_null();
1061 nulls.append_non_null();
1062 nulls.append_null();
1063 let col1_field = Field::new_list_field(
1065 DataType::List(Arc::new(Field::new_list_field(
1066 list_arr1_ref.data_type().to_owned(),
1067 true,
1068 ))),
1069 true,
1070 );
1071 let col1 = ListArray::new(
1072 Arc::new(Field::new_list_field(
1073 list_arr1_ref.data_type().to_owned(),
1074 true,
1075 )),
1076 offsets,
1077 list_arr1_ref,
1078 nulls.finish(),
1079 );
1080
1081 let list_arr2 = StringArray::from(vec![
1082 Some("a"),
1083 Some("b"),
1084 Some("c"),
1085 Some("d"),
1086 Some("e"),
1087 ]);
1088
1089 let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1090 let mut nulls = NullBufferBuilder::new(3);
1091 nulls.append_n_non_nulls(3);
1092 let col2_field = Field::new(
1093 "col2",
1094 DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1095 true,
1096 );
1097 let col2 = GenericListArray::<i32>::new(
1098 Arc::new(Field::new_list_field(DataType::Utf8, true)),
1099 OffsetBuffer::new(offsets.into()),
1100 Arc::new(list_arr2),
1101 nulls.finish(),
1102 );
1103 let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1105 let out_schema = Arc::new(Schema::new(vec![
1106 Field::new(
1107 "col1_unnest_placeholder_depth_1",
1108 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1109 true,
1110 ),
1111 Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1112 Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1113 ]));
1114 let batch = RecordBatch::try_new(
1115 Arc::clone(&schema),
1116 vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1117 )
1118 .unwrap();
1119 let list_type_columns = vec![
1120 ListUnnest {
1121 index_in_input_schema: 0,
1122 depth: 1,
1123 },
1124 ListUnnest {
1125 index_in_input_schema: 0,
1126 depth: 2,
1127 },
1128 ListUnnest {
1129 index_in_input_schema: 1,
1130 depth: 1,
1131 },
1132 ];
1133 let ret = build_batch(
1134 &batch,
1135 &out_schema,
1136 list_type_columns.as_ref(),
1137 &HashSet::default(),
1138 &UnnestOptions {
1139 preserve_nulls: true,
1140 recursions: vec![],
1141 },
1142 )?
1143 .unwrap();
1144
1145 let expected = &[
1146"+---------------------------------+---------------------------------+---------------------------------+",
1147"| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |",
1148"+---------------------------------+---------------------------------+---------------------------------+",
1149"| [1, 2, 3] | 1 | a |",
1150"| | 2 | b |",
1151"| [4, 5] | 3 | |",
1152"| [1, 2, 3] | | a |",
1153"| | | b |",
1154"| [4, 5] | | |",
1155"| [1, 2, 3] | 4 | a |",
1156"| | 5 | b |",
1157"| [4, 5] | | |",
1158"| [7, 8, 9, 10] | 7 | c |",
1159"| | 8 | d |",
1160"| [11, 12, 13] | 9 | |",
1161"| | 10 | |",
1162"| [7, 8, 9, 10] | | c |",
1163"| | | d |",
1164"| [11, 12, 13] | | |",
1165"| [7, 8, 9, 10] | 11 | c |",
1166"| | 12 | d |",
1167"| [11, 12, 13] | 13 | |",
1168"| | | e |",
1169"+---------------------------------+---------------------------------+---------------------------------+",
1170 ];
1171 assert_batches_eq!(expected, &[ret]);
1172 Ok(())
1173 }
1174
1175 #[test]
1176 fn test_unnest_list_array() -> Result<()> {
1177 let list_array = make_generic_array::<i32>();
1179 verify_unnest_list_array(
1180 &list_array,
1181 vec![3, 2, 1, 2, 0, 3],
1182 vec![
1183 Some("A"),
1184 Some("B"),
1185 Some("C"),
1186 None,
1187 None,
1188 None,
1189 Some("D"),
1190 None,
1191 None,
1192 Some("F"),
1193 None,
1194 ],
1195 )?;
1196
1197 let list_array = make_fixed_list();
1199 verify_unnest_list_array(
1200 &list_array,
1201 vec![3, 1, 2, 0, 2, 3],
1202 vec![
1203 Some("A"),
1204 Some("B"),
1205 None,
1206 None,
1207 Some("C"),
1208 Some("D"),
1209 None,
1210 Some("F"),
1211 None,
1212 None,
1213 None,
1214 ],
1215 )?;
1216
1217 Ok(())
1218 }
1219
1220 fn verify_longest_length(
1221 list_arrays: &[ArrayRef],
1222 preserve_nulls: bool,
1223 expected: Vec<i64>,
1224 ) -> Result<()> {
1225 let options = UnnestOptions {
1226 preserve_nulls,
1227 recursions: vec![],
1228 };
1229 let longest_length = find_longest_length(list_arrays, &options)?;
1230 let expected_array = Int64Array::from(expected);
1231 assert_eq!(
1232 longest_length
1233 .as_any()
1234 .downcast_ref::<Int64Array>()
1235 .unwrap(),
1236 &expected_array
1237 );
1238 Ok(())
1239 }
1240
1241 #[test]
1242 fn test_longest_list_length() -> Result<()> {
1243 let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1246 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1247 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1248
1249 let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1252 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1253 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1254
1255 let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1258 verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1259 verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1260
1261 let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1265 let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1266 let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1267 verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1268 verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1269
1270 Ok(())
1271 }
1272
1273 #[test]
1274 fn test_create_take_indices() -> Result<()> {
1275 let length_array = Int64Array::from(vec![2, 3, 1]);
1276 let take_indices = create_take_indices(&length_array, 6);
1277 let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1278 assert_eq!(take_indices, expected);
1279 Ok(())
1280 }
1281}