datafusion_physical_plan/
unnest.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define a plan for unnesting values in columns that contain a list type.
19
20use std::cmp::{self, Ordering};
21use std::task::{ready, Poll};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
25use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
26use crate::{
27    DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
28    SendableRecordBatchStream,
29};
30
31use arrow::array::{
32    new_null_array, Array, ArrayRef, AsArray, FixedSizeListArray, Int64Array,
33    LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
34};
35use arrow::compute::kernels::length::length;
36use arrow::compute::kernels::zip::zip;
37use arrow::compute::{cast, is_not_null, kernels, sum};
38use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
39use arrow::record_batch::RecordBatch;
40use arrow_ord::cmp::lt;
41use datafusion_common::{
42    exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
43};
44use datafusion_execution::TaskContext;
45use datafusion_physical_expr::EquivalenceProperties;
46
47use async_trait::async_trait;
48use futures::{Stream, StreamExt};
49use log::trace;
50
51/// Unnest the given columns (either with type struct or list)
52/// For list unnesting, each rows is vertically transformed into multiple rows
53/// For struct unnesting, each columns is horizontally transformed into multiple columns,
54/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
55///
56/// See [`UnnestOptions`] for more details and an example.
57#[derive(Debug, Clone)]
58pub struct UnnestExec {
59    /// Input execution plan
60    input: Arc<dyn ExecutionPlan>,
61    /// The schema once the unnest is applied
62    schema: SchemaRef,
63    /// Indices of the list-typed columns in the input schema
64    list_column_indices: Vec<ListUnnest>,
65    /// Indices of the struct-typed columns in the input schema
66    struct_column_indices: Vec<usize>,
67    /// Options
68    options: UnnestOptions,
69    /// Execution metrics
70    metrics: ExecutionPlanMetricsSet,
71    /// Cache holding plan properties like equivalences, output partitioning etc.
72    cache: PlanProperties,
73}
74
75impl UnnestExec {
76    /// Create a new [UnnestExec].
77    pub fn new(
78        input: Arc<dyn ExecutionPlan>,
79        list_column_indices: Vec<ListUnnest>,
80        struct_column_indices: Vec<usize>,
81        schema: SchemaRef,
82        options: UnnestOptions,
83    ) -> Self {
84        let cache = Self::compute_properties(&input, Arc::clone(&schema));
85
86        UnnestExec {
87            input,
88            schema,
89            list_column_indices,
90            struct_column_indices,
91            options,
92            metrics: Default::default(),
93            cache,
94        }
95    }
96
97    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
98    fn compute_properties(
99        input: &Arc<dyn ExecutionPlan>,
100        schema: SchemaRef,
101    ) -> PlanProperties {
102        PlanProperties::new(
103            EquivalenceProperties::new(schema),
104            input.output_partitioning().to_owned(),
105            input.pipeline_behavior(),
106            input.boundedness(),
107        )
108    }
109
110    /// Input execution plan
111    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
112        &self.input
113    }
114
115    /// Indices of the list-typed columns in the input schema
116    pub fn list_column_indices(&self) -> &[ListUnnest] {
117        &self.list_column_indices
118    }
119
120    /// Indices of the struct-typed columns in the input schema
121    pub fn struct_column_indices(&self) -> &[usize] {
122        &self.struct_column_indices
123    }
124
125    pub fn options(&self) -> &UnnestOptions {
126        &self.options
127    }
128}
129
130impl DisplayAs for UnnestExec {
131    fn fmt_as(
132        &self,
133        t: DisplayFormatType,
134        f: &mut std::fmt::Formatter,
135    ) -> std::fmt::Result {
136        match t {
137            DisplayFormatType::Default | DisplayFormatType::Verbose => {
138                write!(f, "UnnestExec")
139            }
140        }
141    }
142}
143
144impl ExecutionPlan for UnnestExec {
145    fn name(&self) -> &'static str {
146        "UnnestExec"
147    }
148
149    fn as_any(&self) -> &dyn Any {
150        self
151    }
152
153    fn properties(&self) -> &PlanProperties {
154        &self.cache
155    }
156
157    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
158        vec![&self.input]
159    }
160
161    fn with_new_children(
162        self: Arc<Self>,
163        children: Vec<Arc<dyn ExecutionPlan>>,
164    ) -> Result<Arc<dyn ExecutionPlan>> {
165        Ok(Arc::new(UnnestExec::new(
166            Arc::clone(&children[0]),
167            self.list_column_indices.clone(),
168            self.struct_column_indices.clone(),
169            Arc::clone(&self.schema),
170            self.options.clone(),
171        )))
172    }
173
174    fn required_input_distribution(&self) -> Vec<Distribution> {
175        vec![Distribution::UnspecifiedDistribution]
176    }
177
178    fn execute(
179        &self,
180        partition: usize,
181        context: Arc<TaskContext>,
182    ) -> Result<SendableRecordBatchStream> {
183        let input = self.input.execute(partition, context)?;
184        let metrics = UnnestMetrics::new(partition, &self.metrics);
185
186        Ok(Box::pin(UnnestStream {
187            input,
188            schema: Arc::clone(&self.schema),
189            list_type_columns: self.list_column_indices.clone(),
190            struct_column_indices: self.struct_column_indices.iter().copied().collect(),
191            options: self.options.clone(),
192            metrics,
193        }))
194    }
195
196    fn metrics(&self) -> Option<MetricsSet> {
197        Some(self.metrics.clone_inner())
198    }
199}
200
201#[derive(Clone, Debug)]
202struct UnnestMetrics {
203    /// Total time for column unnesting
204    elapsed_compute: metrics::Time,
205    /// Number of batches consumed
206    input_batches: metrics::Count,
207    /// Number of rows consumed
208    input_rows: metrics::Count,
209    /// Number of batches produced
210    output_batches: metrics::Count,
211    /// Number of rows produced by this operator
212    output_rows: metrics::Count,
213}
214
215impl UnnestMetrics {
216    fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
217        let elapsed_compute = MetricBuilder::new(metrics).elapsed_compute(partition);
218
219        let input_batches =
220            MetricBuilder::new(metrics).counter("input_batches", partition);
221
222        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
223
224        let output_batches =
225            MetricBuilder::new(metrics).counter("output_batches", partition);
226
227        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
228
229        Self {
230            input_batches,
231            input_rows,
232            output_batches,
233            output_rows,
234            elapsed_compute,
235        }
236    }
237}
238
239/// A stream that issues [RecordBatch]es with unnested column data.
240struct UnnestStream {
241    /// Input stream
242    input: SendableRecordBatchStream,
243    /// Unnested schema
244    schema: Arc<Schema>,
245    /// represents all unnest operations to be applied to the input (input index, depth)
246    /// e.g unnest(col1),unnest(unnest(col1)) where col1 has index 1 in original input schema
247    /// then list_type_columns = [ListUnnest{1,1},ListUnnest{1,2}]
248    list_type_columns: Vec<ListUnnest>,
249    struct_column_indices: HashSet<usize>,
250    /// Options
251    options: UnnestOptions,
252    /// Metrics
253    metrics: UnnestMetrics,
254}
255
256impl RecordBatchStream for UnnestStream {
257    fn schema(&self) -> SchemaRef {
258        Arc::clone(&self.schema)
259    }
260}
261
262#[async_trait]
263impl Stream for UnnestStream {
264    type Item = Result<RecordBatch>;
265
266    fn poll_next(
267        mut self: std::pin::Pin<&mut Self>,
268        cx: &mut std::task::Context<'_>,
269    ) -> Poll<Option<Self::Item>> {
270        self.poll_next_impl(cx)
271    }
272}
273
274impl UnnestStream {
275    /// Separate implementation function that unpins the [`UnnestStream`] so
276    /// that partial borrows work correctly
277    fn poll_next_impl(
278        &mut self,
279        cx: &mut std::task::Context<'_>,
280    ) -> Poll<Option<Result<RecordBatch>>> {
281        loop {
282            return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
283                Some(Ok(batch)) => {
284                    let timer = self.metrics.elapsed_compute.timer();
285                    self.metrics.input_batches.add(1);
286                    self.metrics.input_rows.add(batch.num_rows());
287                    let result = build_batch(
288                        &batch,
289                        &self.schema,
290                        &self.list_type_columns,
291                        &self.struct_column_indices,
292                        &self.options,
293                    )?;
294                    timer.done();
295                    let Some(result_batch) = result else {
296                        continue;
297                    };
298                    self.metrics.output_batches.add(1);
299                    self.metrics.output_rows.add(result_batch.num_rows());
300
301                    // Empty record batches should not be emitted.
302                    // They need to be treated as  [`Option<RecordBatch>`]es and handled separately
303                    debug_assert!(result_batch.num_rows() > 0);
304                    Some(Ok(result_batch))
305                }
306                other => {
307                    trace!(
308                        "Processed {} probe-side input batches containing {} rows and \
309                        produced {} output batches containing {} rows in {}",
310                        self.metrics.input_batches,
311                        self.metrics.input_rows,
312                        self.metrics.output_batches,
313                        self.metrics.output_rows,
314                        self.metrics.elapsed_compute,
315                    );
316                    other
317                }
318            });
319        }
320    }
321}
322
323/// Given a set of struct column indices to flatten
324/// try converting the column in input into multiple subfield columns
325/// For example
326/// struct_col: [a: struct(item: int, name: string), b: int]
327/// with a batch
328/// {a: {item: 1, name: "a"}, b: 2},
329/// {a: {item: 3, name: "b"}, b: 4]
330/// will be converted into
331/// {a.item: 1, a.name: "a", b: 2},
332/// {a.item: 3, a.name: "b", b: 4}
333fn flatten_struct_cols(
334    input_batch: &[Arc<dyn Array>],
335    schema: &SchemaRef,
336    struct_column_indices: &HashSet<usize>,
337) -> Result<RecordBatch> {
338    // horizontal expansion because of struct unnest
339    let columns_expanded = input_batch
340        .iter()
341        .enumerate()
342        .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
343            Some(_) => match column_data.data_type() {
344                DataType::Struct(_) => {
345                    let struct_arr =
346                        column_data.as_any().downcast_ref::<StructArray>().unwrap();
347                    Ok(struct_arr.columns().to_vec())
348                }
349                data_type => internal_err!(
350                    "expecting column {} from input plan to be a struct, got {:?}",
351                    idx,
352                    data_type
353                ),
354            },
355            None => Ok(vec![Arc::clone(column_data)]),
356        })
357        .collect::<Result<Vec<_>>>()?
358        .into_iter()
359        .flatten()
360        .collect();
361    Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
362}
363
364#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
365pub struct ListUnnest {
366    pub index_in_input_schema: usize,
367    pub depth: usize,
368}
369
370/// This function is used to execute the unnesting on multiple columns all at once, but
371/// one level at a time, and is called n times, where n is the highest recursion level among
372/// the unnest exprs in the query.
373///
374/// For example giving the following query:
375/// ```sql
376/// select unnest(colA, max_depth:=3) as P1, unnest(colA,max_depth:=2) as P2, unnest(colB, max_depth:=1) as P3 from temp;
377/// ```
378/// Then the total times this function being called is 3
379///
380/// It needs to be aware of which level the current unnesting is, because if there exists
381/// multiple unnesting on the same column, but with different recursion levels, say
382/// **unnest(colA, max_depth:=3)** and **unnest(colA, max_depth:=2)**, then the unnesting
383/// of expr **unnest(colA, max_depth:=3)** will start at level 3, while unnesting for expr
384/// **unnest(colA, max_depth:=2)** has to start at level 2
385///
386/// Set *colA* as a 3-dimension columns and *colB* as an array (1-dimension). As stated,
387/// this function is called with the descending order of recursion depth
388///
389/// Depth = 3
390/// - colA(3-dimension) unnest into temp column temp_P1(2_dimension) (unnesting of P1 starts
391///   from this level)
392/// - colA(3-dimension) having indices repeated by the unnesting operation above
393/// - colB(1-dimension) having indices repeated by the unnesting operation above
394///
395/// Depth = 2
396/// - temp_P1(2-dimension) unnest into temp column temp_P1(1-dimension)
397/// - colA(3-dimension) unnest into temp column temp_P2(2-dimension) (unnesting of P2 starts
398///   from this level)
399/// - colB(1-dimension) having indices repeated by the unnesting operation above
400///
401/// Depth = 1
402/// - temp_P1(1-dimension) unnest into P1
403/// - temp_P2(2-dimension) unnest into P2
404/// - colB(1-dimension) unnest into P3 (unnesting of P3 starts from this level)
405///
406/// The returned array will has the same size as the input batch
407/// and only contains original columns that are not being unnested.
408fn list_unnest_at_level(
409    batch: &[ArrayRef],
410    list_type_unnests: &[ListUnnest],
411    temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
412    level_to_unnest: usize,
413    options: &UnnestOptions,
414) -> Result<Option<Vec<ArrayRef>>> {
415    // Extract unnestable columns at this level
416    let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
417        list_type_unnests
418            .iter()
419            .filter_map(|unnesting| {
420                if level_to_unnest == unnesting.depth {
421                    return Some((
422                        Arc::clone(&batch[unnesting.index_in_input_schema]),
423                        *unnesting,
424                    ));
425                }
426                // This means the unnesting on this item has started at higher level
427                // and need to continue until depth reaches 1
428                if level_to_unnest < unnesting.depth {
429                    return Some((
430                        Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
431                        *unnesting,
432                    ));
433                }
434                None
435            })
436            .unzip();
437
438    // Filter out so that list_arrays only contain column with the highest depth
439    // at the same time, during iteration remove this depth so next time we don't have to unnest them again
440    let longest_length = find_longest_length(&arrs_to_unnest, options)?;
441    let unnested_length = longest_length.as_primitive::<Int64Type>();
442    let total_length = if unnested_length.is_empty() {
443        0
444    } else {
445        sum(unnested_length).ok_or_else(|| {
446            exec_datafusion_err!("Failed to calculate the total unnested length")
447        })? as usize
448    };
449    if total_length == 0 {
450        return Ok(None);
451    }
452
453    // Unnest all the list arrays
454    let unnested_temp_arrays =
455        unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
456
457    // Create the take indices array for other columns
458    let take_indices = create_take_indices(unnested_length, total_length);
459    unnested_temp_arrays
460        .into_iter()
461        .zip(list_unnest_specs.iter())
462        .for_each(|(flatten_arr, unnesting)| {
463            temp_unnested_arrs.insert(*unnesting, flatten_arr);
464        });
465
466    let repeat_mask: Vec<bool> = batch
467        .iter()
468        .enumerate()
469        .map(|(i, _)| {
470            // Check if the column is needed in future levels (levels below the current one)
471            let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
472                unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
473            });
474
475            // Check if the column is involved in unnesting at any level
476            let is_involved_in_unnesting = list_type_unnests
477                .iter()
478                .any(|unnesting| unnesting.index_in_input_schema == i);
479
480            // Repeat columns needed in future levels or not unnested.
481            needed_in_future_levels || !is_involved_in_unnesting
482        })
483        .collect();
484
485    // Dimension of arrays in batch is untouched, but the values are repeated
486    // as the side effect of unnesting
487    let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
488
489    Ok(Some(ret))
490}
491struct UnnestingResult {
492    arr: ArrayRef,
493    depth: usize,
494}
495
496/// For each row in a `RecordBatch`, some list/struct columns need to be unnested.
497/// - For list columns: We will expand the values in each list into multiple rows,
498///   taking the longest length among these lists, and shorter lists are padded with NULLs.
499/// - For struct columns: We will expand the struct columns into multiple subfield columns.
500///
501/// For columns that don't need to be unnested, repeat their values until reaching the longest length.
502///
503/// Note: unnest has a big difference in behavior between Postgres and DuckDB
504///
505/// Take this example
506///
507/// 1. Postgres
508/// ```ignored
509/// create table temp (
510///     i integer[][][], j integer[]
511/// )
512/// insert into temp values ('{{{1,2},{3,4}},{{5,6},{7,8}}}', '{1,2}');
513/// select unnest(i), unnest(j) from temp;
514/// ```
515///
516/// Result
517/// ```text
518///     1   1
519///     2   2
520///     3
521///     4
522///     5
523///     6
524///     7
525///     8
526/// ```
527/// 2. DuckDB
528/// ```ignore
529///     create table temp (i integer[][][], j integer[]);
530///     insert into temp values ([[[1,2],[3,4]],[[5,6],[7,8]]], [1,2]);
531///     select unnest(i,recursive:=true), unnest(j,recursive:=true) from temp;
532/// ```
533/// Result:
534/// ```text
535///
536///     ┌────────────────────────────────────────────────┬────────────────────────────────────────────────┐
537///     │ unnest(i, "recursive" := CAST('t' AS BOOLEAN)) │ unnest(j, "recursive" := CAST('t' AS BOOLEAN)) │
538///     │                     int32                      │                     int32                      │
539///     ├────────────────────────────────────────────────┼────────────────────────────────────────────────┤
540///     │                                              1 │                                              1 │
541///     │                                              2 │                                              2 │
542///     │                                              3 │                                              1 │
543///     │                                              4 │                                              2 │
544///     │                                              5 │                                              1 │
545///     │                                              6 │                                              2 │
546///     │                                              7 │                                              1 │
547///     │                                              8 │                                              2 │
548///     └────────────────────────────────────────────────┴────────────────────────────────────────────────┘
549/// ```
550///
551/// The following implementation refer to DuckDB's implementation
552fn build_batch(
553    batch: &RecordBatch,
554    schema: &SchemaRef,
555    list_type_columns: &[ListUnnest],
556    struct_column_indices: &HashSet<usize>,
557    options: &UnnestOptions,
558) -> Result<Option<RecordBatch>> {
559    let transformed = match list_type_columns.len() {
560        0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
561        _ => {
562            let mut temp_unnested_result = HashMap::new();
563            let max_recursion = list_type_columns
564                .iter()
565                .fold(0, |highest_depth, ListUnnest { depth, .. }| {
566                    cmp::max(highest_depth, *depth)
567                });
568
569            // This arr always has the same column count with the input batch
570            let mut flatten_arrs = vec![];
571
572            // Original batch has the same columns
573            // All unnesting results are written to temp_batch
574            for depth in (1..=max_recursion).rev() {
575                let input = match depth == max_recursion {
576                    true => batch.columns(),
577                    false => &flatten_arrs,
578                };
579                let Some(temp_result) = list_unnest_at_level(
580                    input,
581                    list_type_columns,
582                    &mut temp_unnested_result,
583                    depth,
584                    options,
585                )?
586                else {
587                    return Ok(None);
588                };
589                flatten_arrs = temp_result;
590            }
591            let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
592                temp_unnested_result.into_iter().fold(
593                    HashMap::new(),
594                    |mut acc,
595                     (
596                        ListUnnest {
597                            index_in_input_schema,
598                            depth,
599                        },
600                        flattened_array,
601                    )| {
602                        acc.entry(index_in_input_schema).or_default().push(
603                            UnnestingResult {
604                                arr: flattened_array,
605                                depth,
606                            },
607                        );
608                        acc
609                    },
610                );
611            let output_order: HashMap<ListUnnest, usize> = list_type_columns
612                .iter()
613                .enumerate()
614                .map(|(order, unnest_def)| (*unnest_def, order))
615                .collect();
616
617            // One original column may be unnested multiple times into separate columns
618            let mut multi_unnested_per_original_index = unnested_array_map
619                .into_iter()
620                .map(
621                    // Each item in unnested_columns is the result of unnesting the same input column
622                    // we need to sort them to conform with the original expression order
623                    // e.g unnest(unnest(col)) must goes before unnest(col)
624                    |(original_index, mut unnested_columns)| {
625                        unnested_columns.sort_by(
626                            |UnnestingResult { depth: depth1, .. },
627                             UnnestingResult { depth: depth2, .. }|
628                             -> Ordering {
629                                output_order
630                                    .get(&ListUnnest {
631                                        depth: *depth1,
632                                        index_in_input_schema: original_index,
633                                    })
634                                    .unwrap()
635                                    .cmp(
636                                        output_order
637                                            .get(&ListUnnest {
638                                                depth: *depth2,
639                                                index_in_input_schema: original_index,
640                                            })
641                                            .unwrap(),
642                                    )
643                            },
644                        );
645                        (
646                            original_index,
647                            unnested_columns
648                                .into_iter()
649                                .map(|result| result.arr)
650                                .collect::<Vec<_>>(),
651                        )
652                    },
653                )
654                .collect::<HashMap<_, _>>();
655
656            let ret = flatten_arrs
657                .into_iter()
658                .enumerate()
659                .flat_map(|(col_idx, arr)| {
660                    // Convert original column into its unnested version(s)
661                    // Plural because one column can be unnested with different recursion level
662                    // and into separate output columns
663                    match multi_unnested_per_original_index.remove(&col_idx) {
664                        Some(unnested_arrays) => unnested_arrays,
665                        None => vec![arr],
666                    }
667                })
668                .collect::<Vec<_>>();
669
670            flatten_struct_cols(&ret, schema, struct_column_indices)
671        }
672    }?;
673    Ok(Some(transformed))
674}
675
676/// Find the longest list length among the given list arrays for each row.
677///
678/// For example if we have the following two list arrays:
679///
680/// ```ignore
681/// l1: [1, 2, 3], null, [], [3]
682/// l2: [4,5], [], null, [6, 7]
683/// ```
684///
685/// If `preserve_nulls` is false, the longest length array will be:
686///
687/// ```ignore
688/// longest_length: [3, 0, 0, 2]
689/// ```
690///
691/// whereas if `preserve_nulls` is true, the longest length array will be:
692///
693///
694/// ```ignore
695/// longest_length: [3, 1, 1, 2]
696/// ```
697///
698fn find_longest_length(
699    list_arrays: &[ArrayRef],
700    options: &UnnestOptions,
701) -> Result<ArrayRef> {
702    // The length of a NULL list
703    let null_length = if options.preserve_nulls {
704        Scalar::new(Int64Array::from_value(1, 1))
705    } else {
706        Scalar::new(Int64Array::from_value(0, 1))
707    };
708    let list_lengths: Vec<ArrayRef> = list_arrays
709        .iter()
710        .map(|list_array| {
711            let mut length_array = length(list_array)?;
712            // Make sure length arrays have the same type. Int64 is the most general one.
713            length_array = cast(&length_array, &DataType::Int64)?;
714            length_array =
715                zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
716            Ok(length_array)
717        })
718        .collect::<Result<_>>()?;
719
720    let longest_length = list_lengths.iter().skip(1).try_fold(
721        Arc::clone(&list_lengths[0]),
722        |longest, current| {
723            let is_lt = lt(&longest, &current)?;
724            zip(&is_lt, &current, &longest)
725        },
726    )?;
727    Ok(longest_length)
728}
729
730/// Trait defining common methods used for unnesting, implemented by list array types.
731trait ListArrayType: Array {
732    /// Returns a reference to the values of this list.
733    fn values(&self) -> &ArrayRef;
734
735    /// Returns the start and end offset of the values for the given row.
736    fn value_offsets(&self, row: usize) -> (i64, i64);
737}
738
739impl ListArrayType for ListArray {
740    fn values(&self) -> &ArrayRef {
741        self.values()
742    }
743
744    fn value_offsets(&self, row: usize) -> (i64, i64) {
745        let offsets = self.value_offsets();
746        (offsets[row].into(), offsets[row + 1].into())
747    }
748}
749
750impl ListArrayType for LargeListArray {
751    fn values(&self) -> &ArrayRef {
752        self.values()
753    }
754
755    fn value_offsets(&self, row: usize) -> (i64, i64) {
756        let offsets = self.value_offsets();
757        (offsets[row], offsets[row + 1])
758    }
759}
760
761impl ListArrayType for FixedSizeListArray {
762    fn values(&self) -> &ArrayRef {
763        self.values()
764    }
765
766    fn value_offsets(&self, row: usize) -> (i64, i64) {
767        let start = self.value_offset(row) as i64;
768        (start, start + self.value_length() as i64)
769    }
770}
771
772/// Unnest multiple list arrays according to the length array.
773fn unnest_list_arrays(
774    list_arrays: &[ArrayRef],
775    length_array: &PrimitiveArray<Int64Type>,
776    capacity: usize,
777) -> Result<Vec<ArrayRef>> {
778    let typed_arrays = list_arrays
779        .iter()
780        .map(|list_array| match list_array.data_type() {
781            DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
782            DataType::LargeList(_) => {
783                Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
784            }
785            DataType::FixedSizeList(_, _) => {
786                Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
787            }
788            other => exec_err!("Invalid unnest datatype {other }"),
789        })
790        .collect::<Result<Vec<_>>>()?;
791
792    typed_arrays
793        .iter()
794        .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
795        .collect::<Result<_>>()
796}
797
798/// Unnest a list array according the target length array.
799///
800/// Consider a list array like this:
801///
802/// ```ignore
803/// [1], [2, 3, 4], null, [5], [],
804/// ```
805///
806/// and the length array is:
807///
808/// ```ignore
809/// [2, 3, 2, 1, 2]
810/// ```
811///
812/// If the length of a certain list is less than the target length, pad with NULLs.
813/// So the unnested array will look like this:
814///
815/// ```ignore
816/// [1, null, 2, 3, 4, null, null, 5, null, null]
817/// ```
818///
819fn unnest_list_array(
820    list_array: &dyn ListArrayType,
821    length_array: &PrimitiveArray<Int64Type>,
822    capacity: usize,
823) -> Result<ArrayRef> {
824    let values = list_array.values();
825    let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
826    for row in 0..list_array.len() {
827        let mut value_length = 0;
828        if !list_array.is_null(row) {
829            let (start, end) = list_array.value_offsets(row);
830            value_length = end - start;
831            for i in start..end {
832                take_indices_builder.append_value(i)
833            }
834        }
835        let target_length = length_array.value(row);
836        debug_assert!(
837            value_length <= target_length,
838            "value length is beyond the longest length"
839        );
840        // Pad with NULL values
841        for _ in value_length..target_length {
842            take_indices_builder.append_null();
843        }
844    }
845    Ok(kernels::take::take(
846        &values,
847        &take_indices_builder.finish(),
848        None,
849    )?)
850}
851
852/// Creates take indices that will be used to expand all columns except for the list type
853/// [`columns`](UnnestExec::list_column_indices) that is being unnested.
854/// Every column value needs to be repeated multiple times according to the length array.
855///
856/// If the length array looks like this:
857///
858/// ```ignore
859/// [2, 3, 1]
860/// ```
861/// Then [`create_take_indices`] will return an array like this
862///
863/// ```ignore
864/// [0, 0, 1, 1, 1, 2]
865/// ```
866///
867fn create_take_indices(
868    length_array: &PrimitiveArray<Int64Type>,
869    capacity: usize,
870) -> PrimitiveArray<Int64Type> {
871    // `find_longest_length()` guarantees this.
872    debug_assert!(
873        length_array.null_count() == 0,
874        "length array should not contain nulls"
875    );
876    let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
877    for (index, repeat) in length_array.iter().enumerate() {
878        // The length array should not contain nulls, so unwrap is safe
879        let repeat = repeat.unwrap();
880        (0..repeat).for_each(|_| builder.append_value(index as i64));
881    }
882    builder.finish()
883}
884
885/// Create a batch of arrays based on an input `batch` and a `indices` array.
886/// The `indices` array is used by the take kernel to repeat values in the arrays
887/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
888/// in the `repeat_mask` will be replaced with arrays filled with nulls of the
889/// appropriate length.
890///
891/// For example if we have the following batch:
892///
893/// ```ignore
894/// c1: [1], null, [2, 3, 4], null, [5, 6]
895/// c2: 'a', 'b',  'c', null, 'd'
896/// ```
897///
898/// then the `unnested_list_arrays` contains the unnest column that will replace `c1` in
899/// the final batch if `preserve_nulls` is true:
900///
901/// ```ignore
902/// c1: 1, null, 2, 3, 4, null, 5, 6
903/// ```
904///
905/// And the `indices` array contains the indices that are used by `take` kernel to
906/// repeat the values in `c2`:
907///
908/// ```ignore
909/// 0, 1, 2, 2, 2, 3, 4, 4
910/// ```
911///
912/// so that the final batch will look like:
913///
914/// ```ignore
915/// c1: 1, null, 2, 3, 4, null, 5, 6
916/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
917/// ```
918///
919/// The `repeat_mask` determines whether an array's values are repeated or replaced with nulls.
920/// For example, if the `repeat_mask` is:
921///
922/// ```ignore
923/// [true, false]
924/// ```
925///
926/// The final batch will look like:
927///
928/// ```ignore
929/// c1: 1, null, 2, 3, 4, null, 5, 6  // Repeated using `indices`
930/// c2: null, null, null, null, null, null, null, null  // Replaced with nulls
931///
932fn repeat_arrs_from_indices(
933    batch: &[ArrayRef],
934    indices: &PrimitiveArray<Int64Type>,
935    repeat_mask: &[bool],
936) -> Result<Vec<Arc<dyn Array>>> {
937    batch
938        .iter()
939        .zip(repeat_mask.iter())
940        .map(|(arr, &repeat)| {
941            if repeat {
942                Ok(kernels::take::take(arr, indices, None)?)
943            } else {
944                Ok(new_null_array(arr.data_type(), arr.len()))
945            }
946        })
947        .collect()
948}
949
950#[cfg(test)]
951mod tests {
952    use super::*;
953    use arrow::array::{
954        GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
955    };
956    use arrow::buffer::{NullBuffer, OffsetBuffer};
957    use arrow::datatypes::{Field, Int32Type};
958    use datafusion_common::assert_batches_eq;
959
960    // Create a GenericListArray with the following list values:
961    //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
962    fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
963    where
964        OffsetSize: OffsetSizeTrait,
965    {
966        let mut values = vec![];
967        let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
968        let mut valid = NullBufferBuilder::new(6);
969
970        // [A, B, C]
971        values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
972        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
973        valid.append_non_null();
974
975        // []
976        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
977        valid.append_non_null();
978
979        // NULL with non-zero value length
980        // Issue https://github.com/apache/datafusion/issues/9932
981        values.push(Some("?"));
982        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
983        valid.append_null();
984
985        // [D]
986        values.push(Some("D"));
987        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
988        valid.append_non_null();
989
990        // Another NULL with zero value length
991        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
992        valid.append_null();
993
994        // [NULL, F]
995        values.extend_from_slice(&[None, Some("F")]);
996        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
997        valid.append_non_null();
998
999        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1000        GenericListArray::<OffsetSize>::new(
1001            field,
1002            OffsetBuffer::new(offsets.into()),
1003            Arc::new(StringArray::from(values)),
1004            valid.finish(),
1005        )
1006    }
1007
1008    // Create a FixedSizeListArray with the following list values:
1009    //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1010    fn make_fixed_list() -> FixedSizeListArray {
1011        let values = Arc::new(StringArray::from_iter([
1012            Some("A"),
1013            Some("B"),
1014            None,
1015            None,
1016            Some("C"),
1017            Some("D"),
1018            None,
1019            None,
1020            None,
1021            Some("F"),
1022            None,
1023            None,
1024        ]));
1025        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1026        let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1027        FixedSizeListArray::new(field, 2, values, Some(valid))
1028    }
1029
1030    fn verify_unnest_list_array(
1031        list_array: &dyn ListArrayType,
1032        lengths: Vec<i64>,
1033        expected: Vec<Option<&str>>,
1034    ) -> Result<()> {
1035        let length_array = Int64Array::from(lengths);
1036        let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1037        let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1038        assert_eq!(strs, expected);
1039        Ok(())
1040    }
1041
1042    #[test]
1043    fn test_build_batch_list_arr_recursive() -> Result<()> {
1044        // col1                             | col2
1045        // [[1,2,3],null,[4,5]]             | ['a','b']
1046        // [[7,8,9,10], null, [11,12,13]]   | ['c','d']
1047        // null                             | ['e']
1048        let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1049            Some(vec![Some(1), Some(2), Some(3)]),
1050            None,
1051            Some(vec![Some(4), Some(5)]),
1052            Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1053            None,
1054            Some(vec![Some(11), Some(12), Some(13)]),
1055        ]);
1056
1057        let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1058        let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1059        let mut nulls = NullBufferBuilder::new(3);
1060        nulls.append_non_null();
1061        nulls.append_non_null();
1062        nulls.append_null();
1063        // list<list<int32>>
1064        let col1_field = Field::new_list_field(
1065            DataType::List(Arc::new(Field::new_list_field(
1066                list_arr1_ref.data_type().to_owned(),
1067                true,
1068            ))),
1069            true,
1070        );
1071        let col1 = ListArray::new(
1072            Arc::new(Field::new_list_field(
1073                list_arr1_ref.data_type().to_owned(),
1074                true,
1075            )),
1076            offsets,
1077            list_arr1_ref,
1078            nulls.finish(),
1079        );
1080
1081        let list_arr2 = StringArray::from(vec![
1082            Some("a"),
1083            Some("b"),
1084            Some("c"),
1085            Some("d"),
1086            Some("e"),
1087        ]);
1088
1089        let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1090        let mut nulls = NullBufferBuilder::new(3);
1091        nulls.append_n_non_nulls(3);
1092        let col2_field = Field::new(
1093            "col2",
1094            DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1095            true,
1096        );
1097        let col2 = GenericListArray::<i32>::new(
1098            Arc::new(Field::new_list_field(DataType::Utf8, true)),
1099            OffsetBuffer::new(offsets.into()),
1100            Arc::new(list_arr2),
1101            nulls.finish(),
1102        );
1103        // convert col1 and col2 to a record batch
1104        let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1105        let out_schema = Arc::new(Schema::new(vec![
1106            Field::new(
1107                "col1_unnest_placeholder_depth_1",
1108                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1109                true,
1110            ),
1111            Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1112            Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1113        ]));
1114        let batch = RecordBatch::try_new(
1115            Arc::clone(&schema),
1116            vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1117        )
1118        .unwrap();
1119        let list_type_columns = vec![
1120            ListUnnest {
1121                index_in_input_schema: 0,
1122                depth: 1,
1123            },
1124            ListUnnest {
1125                index_in_input_schema: 0,
1126                depth: 2,
1127            },
1128            ListUnnest {
1129                index_in_input_schema: 1,
1130                depth: 1,
1131            },
1132        ];
1133        let ret = build_batch(
1134            &batch,
1135            &out_schema,
1136            list_type_columns.as_ref(),
1137            &HashSet::default(),
1138            &UnnestOptions {
1139                preserve_nulls: true,
1140                recursions: vec![],
1141            },
1142        )?
1143        .unwrap();
1144
1145        let expected = &[
1146"+---------------------------------+---------------------------------+---------------------------------+",
1147"| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |",
1148"+---------------------------------+---------------------------------+---------------------------------+",
1149"| [1, 2, 3]                       | 1                               | a                               |",
1150"|                                 | 2                               | b                               |",
1151"| [4, 5]                          | 3                               |                                 |",
1152"| [1, 2, 3]                       |                                 | a                               |",
1153"|                                 |                                 | b                               |",
1154"| [4, 5]                          |                                 |                                 |",
1155"| [1, 2, 3]                       | 4                               | a                               |",
1156"|                                 | 5                               | b                               |",
1157"| [4, 5]                          |                                 |                                 |",
1158"| [7, 8, 9, 10]                   | 7                               | c                               |",
1159"|                                 | 8                               | d                               |",
1160"| [11, 12, 13]                    | 9                               |                                 |",
1161"|                                 | 10                              |                                 |",
1162"| [7, 8, 9, 10]                   |                                 | c                               |",
1163"|                                 |                                 | d                               |",
1164"| [11, 12, 13]                    |                                 |                                 |",
1165"| [7, 8, 9, 10]                   | 11                              | c                               |",
1166"|                                 | 12                              | d                               |",
1167"| [11, 12, 13]                    | 13                              |                                 |",
1168"|                                 |                                 | e                               |",
1169"+---------------------------------+---------------------------------+---------------------------------+",
1170        ];
1171        assert_batches_eq!(expected, &[ret]);
1172        Ok(())
1173    }
1174
1175    #[test]
1176    fn test_unnest_list_array() -> Result<()> {
1177        // [A, B, C], [], NULL, [D], NULL, [NULL, F]
1178        let list_array = make_generic_array::<i32>();
1179        verify_unnest_list_array(
1180            &list_array,
1181            vec![3, 2, 1, 2, 0, 3],
1182            vec![
1183                Some("A"),
1184                Some("B"),
1185                Some("C"),
1186                None,
1187                None,
1188                None,
1189                Some("D"),
1190                None,
1191                None,
1192                Some("F"),
1193                None,
1194            ],
1195        )?;
1196
1197        // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1198        let list_array = make_fixed_list();
1199        verify_unnest_list_array(
1200            &list_array,
1201            vec![3, 1, 2, 0, 2, 3],
1202            vec![
1203                Some("A"),
1204                Some("B"),
1205                None,
1206                None,
1207                Some("C"),
1208                Some("D"),
1209                None,
1210                Some("F"),
1211                None,
1212                None,
1213                None,
1214            ],
1215        )?;
1216
1217        Ok(())
1218    }
1219
1220    fn verify_longest_length(
1221        list_arrays: &[ArrayRef],
1222        preserve_nulls: bool,
1223        expected: Vec<i64>,
1224    ) -> Result<()> {
1225        let options = UnnestOptions {
1226            preserve_nulls,
1227            recursions: vec![],
1228        };
1229        let longest_length = find_longest_length(list_arrays, &options)?;
1230        let expected_array = Int64Array::from(expected);
1231        assert_eq!(
1232            longest_length
1233                .as_any()
1234                .downcast_ref::<Int64Array>()
1235                .unwrap(),
1236            &expected_array
1237        );
1238        Ok(())
1239    }
1240
1241    #[test]
1242    fn test_longest_list_length() -> Result<()> {
1243        // Test with single ListArray
1244        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1245        let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1246        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1247        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1248
1249        // Test with single LargeListArray
1250        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1251        let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1252        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1253        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1254
1255        // Test with single FixedSizeListArray
1256        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1257        let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1258        verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1259        verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1260
1261        // Test with multiple list arrays
1262        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1263        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1264        let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1265        let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1266        let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1267        verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1268        verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1269
1270        Ok(())
1271    }
1272
1273    #[test]
1274    fn test_create_take_indices() -> Result<()> {
1275        let length_array = Int64Array::from(vec![2, 3, 1]);
1276        let take_indices = create_take_indices(&length_array, 6);
1277        let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1278        assert_eq!(take_indices, expected);
1279        Ok(())
1280    }
1281}