lance_encoding/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
11use arrow_schema::{DataType, Field, Fields};
12use futures::{
13    future::BoxFuture,
14    stream::{FuturesOrdered, FuturesUnordered},
15    FutureExt, StreamExt, TryStreamExt,
16};
17use itertools::Itertools;
18use lance_arrow::FieldExt;
19use log::trace;
20use snafu::location;
21
22use crate::{
23    decoder::{
24        DecodeArrayTask, DecodedArray, DecoderReady, FieldScheduler, FilterExpression, LoadedPage,
25        LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding, PriorityRange,
26        ScheduledScanLine, SchedulerContext, SchedulingJob, StructuralDecodeArrayTask,
27        StructuralFieldDecoder, StructuralFieldScheduler, StructuralSchedulingJob,
28    },
29    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
30    format::pb,
31    repdef::RepDefBuilder,
32};
33use lance_core::{Error, Result};
34
35use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
36
37#[derive(Debug)]
38struct SchedulingJobWithStatus<'a> {
39    col_idx: u32,
40    col_name: &'a str,
41    job: Box<dyn SchedulingJob + 'a>,
42    rows_scheduled: u64,
43    rows_remaining: u64,
44}
45
46impl PartialEq for SchedulingJobWithStatus<'_> {
47    fn eq(&self, other: &Self) -> bool {
48        self.col_idx == other.col_idx
49    }
50}
51
52impl Eq for SchedulingJobWithStatus<'_> {}
53
54impl PartialOrd for SchedulingJobWithStatus<'_> {
55    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56        Some(self.cmp(other))
57    }
58}
59
60impl Ord for SchedulingJobWithStatus<'_> {
61    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
62        // Note this is reversed to make it min-heap
63        other.rows_scheduled.cmp(&self.rows_scheduled)
64    }
65}
66
67#[derive(Debug)]
68struct EmptyStructDecodeTask {
69    num_rows: u64,
70}
71
72impl DecodeArrayTask for EmptyStructDecodeTask {
73    fn decode(self: Box<Self>) -> Result<ArrayRef> {
74        Ok(Arc::new(StructArray::new_empty_fields(
75            self.num_rows as usize,
76            None,
77        )))
78    }
79}
80
81#[derive(Debug)]
82struct EmptyStructDecoder {
83    num_rows: u64,
84    rows_drained: u64,
85    data_type: DataType,
86}
87
88impl EmptyStructDecoder {
89    fn new(num_rows: u64) -> Self {
90        Self {
91            num_rows,
92            rows_drained: 0,
93            data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
94        }
95    }
96}
97
98impl LogicalPageDecoder for EmptyStructDecoder {
99    fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<Result<()>> {
100        Box::pin(std::future::ready(Ok(())))
101    }
102    fn rows_loaded(&self) -> u64 {
103        self.num_rows
104    }
105    fn rows_unloaded(&self) -> u64 {
106        0
107    }
108    fn num_rows(&self) -> u64 {
109        self.num_rows
110    }
111    fn rows_drained(&self) -> u64 {
112        self.rows_drained
113    }
114    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
115        self.rows_drained += num_rows;
116        Ok(NextDecodeTask {
117            num_rows,
118            task: Box::new(EmptyStructDecodeTask { num_rows }),
119        })
120    }
121    fn data_type(&self) -> &DataType {
122        &self.data_type
123    }
124}
125
126#[derive(Debug)]
127struct EmptyStructSchedulerJob {
128    num_rows: u64,
129}
130
131impl SchedulingJob for EmptyStructSchedulerJob {
132    fn schedule_next(
133        &mut self,
134        context: &mut SchedulerContext,
135        _priority: &dyn PriorityRange,
136    ) -> Result<ScheduledScanLine> {
137        let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
138        let struct_decoder = context.locate_decoder(empty_decoder);
139        Ok(ScheduledScanLine {
140            decoders: vec![MessageType::DecoderReady(struct_decoder)],
141            rows_scheduled: self.num_rows,
142        })
143    }
144
145    fn num_rows(&self) -> u64 {
146        self.num_rows
147    }
148}
149
150/// Scheduling job for struct data
151///
152/// The order in which we schedule the children is important.  We want to schedule the child
153/// with the least amount of data first.
154///
155/// This allows us to decode entire rows as quickly as possible
156#[derive(Debug)]
157struct SimpleStructSchedulerJob<'a> {
158    scheduler: &'a SimpleStructScheduler,
159    /// A min-heap whose key is the # of rows currently scheduled
160    children: BinaryHeap<SchedulingJobWithStatus<'a>>,
161    rows_scheduled: u64,
162    num_rows: u64,
163    initialized: bool,
164}
165
166impl<'a> SimpleStructSchedulerJob<'a> {
167    fn new(
168        scheduler: &'a SimpleStructScheduler,
169        children: Vec<Box<dyn SchedulingJob + 'a>>,
170        num_rows: u64,
171    ) -> Self {
172        let children = children
173            .into_iter()
174            .enumerate()
175            .map(|(idx, job)| SchedulingJobWithStatus {
176                col_idx: idx as u32,
177                col_name: scheduler.child_fields[idx].name(),
178                job,
179                rows_scheduled: 0,
180                rows_remaining: num_rows,
181            })
182            .collect::<BinaryHeap<_>>();
183        Self {
184            scheduler,
185            children,
186            rows_scheduled: 0,
187            num_rows,
188            initialized: false,
189        }
190    }
191}
192
193impl SchedulingJob for SimpleStructSchedulerJob<'_> {
194    fn schedule_next(
195        &mut self,
196        mut context: &mut SchedulerContext,
197        priority: &dyn PriorityRange,
198    ) -> Result<ScheduledScanLine> {
199        let mut decoders = Vec::new();
200        if !self.initialized {
201            // Send info to the decoder thread so it knows a struct is here.  In the future we will also
202            // send validity info here.
203            let struct_decoder = Box::new(SimpleStructDecoder::new(
204                self.scheduler.child_fields.clone(),
205                self.num_rows,
206            ));
207            let struct_decoder = context.locate_decoder(struct_decoder);
208            decoders.push(MessageType::DecoderReady(struct_decoder));
209            self.initialized = true;
210        }
211        let old_rows_scheduled = self.rows_scheduled;
212        // Schedule as many children as we need to until we have scheduled at least one
213        // complete row
214        while old_rows_scheduled == self.rows_scheduled {
215            let mut next_child = self.children.pop().unwrap();
216            trace!("Scheduling more rows for child {}", next_child.col_idx);
217            let scoped = context.push(next_child.col_name, next_child.col_idx);
218            let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
219            trace!(
220                "Scheduled {} rows for child {}",
221                child_scan.rows_scheduled,
222                next_child.col_idx
223            );
224            next_child.rows_scheduled += child_scan.rows_scheduled;
225            next_child.rows_remaining -= child_scan.rows_scheduled;
226            decoders.extend(child_scan.decoders);
227            self.children.push(next_child);
228            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
229            context = scoped.pop();
230        }
231        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
232        Ok(ScheduledScanLine {
233            decoders,
234            rows_scheduled: struct_rows_scheduled,
235        })
236    }
237
238    fn num_rows(&self) -> u64 {
239        self.num_rows
240    }
241}
242
243/// A scheduler for structs
244///
245/// The implementation is actually a bit more tricky than one might initially think.  We can't just
246/// go through and schedule each column one after the other.  This would mean our decode can't start
247/// until nearly all the data has arrived (since we need data from each column)
248///
249/// Instead, we schedule in row-major fashion
250///
251/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
252/// record batch as a non-nullable struct.
253#[derive(Debug)]
254pub struct SimpleStructScheduler {
255    children: Vec<Arc<dyn FieldScheduler>>,
256    child_fields: Fields,
257    num_rows: u64,
258}
259
260impl SimpleStructScheduler {
261    pub fn new(
262        children: Vec<Arc<dyn FieldScheduler>>,
263        child_fields: Fields,
264        num_rows: u64,
265    ) -> Self {
266        let num_rows = children
267            .first()
268            .map(|child| child.num_rows())
269            .unwrap_or(num_rows);
270        debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
271        Self {
272            children,
273            child_fields,
274            num_rows,
275        }
276    }
277}
278
279impl FieldScheduler for SimpleStructScheduler {
280    fn schedule_ranges<'a>(
281        &'a self,
282        ranges: &[Range<u64>],
283        filter: &FilterExpression,
284    ) -> Result<Box<dyn SchedulingJob + 'a>> {
285        if self.children.is_empty() {
286            return Ok(Box::new(EmptyStructSchedulerJob {
287                num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
288            }));
289        }
290        let child_schedulers = self
291            .children
292            .iter()
293            .map(|child| child.schedule_ranges(ranges, filter))
294            .collect::<Result<Vec<_>>>()?;
295        let num_rows = child_schedulers[0].num_rows();
296        Ok(Box::new(SimpleStructSchedulerJob::new(
297            self,
298            child_schedulers,
299            num_rows,
300        )))
301    }
302
303    fn num_rows(&self) -> u64 {
304        self.num_rows
305    }
306
307    fn initialize<'a>(
308        &'a self,
309        _filter: &'a FilterExpression,
310        _context: &'a SchedulerContext,
311    ) -> BoxFuture<'a, Result<()>> {
312        let futures = self
313            .children
314            .iter()
315            .map(|child| child.initialize(_filter, _context))
316            .collect::<FuturesUnordered<_>>();
317        async move {
318            futures
319                .map(|res| res.map(|_| ()))
320                .try_collect::<Vec<_>>()
321                .await?;
322            Ok(())
323        }
324        .boxed()
325    }
326}
327
328#[derive(Debug)]
329struct StructuralSchedulingJobWithStatus<'a> {
330    col_idx: u32,
331    col_name: &'a str,
332    job: Box<dyn StructuralSchedulingJob + 'a>,
333    rows_scheduled: u64,
334    rows_remaining: u64,
335}
336
337impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
338    fn eq(&self, other: &Self) -> bool {
339        self.col_idx == other.col_idx
340    }
341}
342
343impl Eq for StructuralSchedulingJobWithStatus<'_> {}
344
345impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
346    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
347        Some(self.cmp(other))
348    }
349}
350
351impl Ord for StructuralSchedulingJobWithStatus<'_> {
352    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
353        // Note this is reversed to make it min-heap
354        other.rows_scheduled.cmp(&self.rows_scheduled)
355    }
356}
357
358/// Scheduling job for struct data
359///
360/// The order in which we schedule the children is important.  We want to schedule the child
361/// with the least amount of data first.
362///
363/// This allows us to decode entire rows as quickly as possible
364#[derive(Debug)]
365struct RepDefStructSchedulingJob<'a> {
366    /// A min-heap whose key is the # of rows currently scheduled
367    children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
368    rows_scheduled: u64,
369}
370
371impl<'a> RepDefStructSchedulingJob<'a> {
372    fn new(
373        scheduler: &'a StructuralStructScheduler,
374        children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
375        num_rows: u64,
376    ) -> Self {
377        let children = children
378            .into_iter()
379            .enumerate()
380            .map(|(idx, job)| StructuralSchedulingJobWithStatus {
381                col_idx: idx as u32,
382                col_name: scheduler.child_fields[idx].name(),
383                job,
384                rows_scheduled: 0,
385                rows_remaining: num_rows,
386            })
387            .collect::<BinaryHeap<_>>();
388        Self {
389            children,
390            rows_scheduled: 0,
391        }
392    }
393}
394
395impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
396    fn schedule_next(
397        &mut self,
398        mut context: &mut SchedulerContext,
399    ) -> Result<Option<ScheduledScanLine>> {
400        let mut decoders = Vec::new();
401        let old_rows_scheduled = self.rows_scheduled;
402        // Schedule as many children as we need to until we have scheduled at least one
403        // complete row
404        while old_rows_scheduled == self.rows_scheduled {
405            let mut next_child = self.children.pop().unwrap();
406            let scoped = context.push(next_child.col_name, next_child.col_idx);
407            let child_scan = next_child.job.schedule_next(scoped.context)?;
408            // next_child is the least-scheduled child and, if it's done, that
409            // means we are completely done.
410            if child_scan.is_none() {
411                return Ok(None);
412            }
413            let child_scan = child_scan.unwrap();
414
415            trace!(
416                "Scheduled {} rows for child {}",
417                child_scan.rows_scheduled,
418                next_child.col_idx
419            );
420            next_child.rows_scheduled += child_scan.rows_scheduled;
421            next_child.rows_remaining -= child_scan.rows_scheduled;
422            decoders.extend(child_scan.decoders);
423            self.children.push(next_child);
424            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
425            context = scoped.pop();
426        }
427        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
428        Ok(Some(ScheduledScanLine {
429            decoders,
430            rows_scheduled: struct_rows_scheduled,
431        }))
432    }
433}
434
435/// A scheduler for structs
436///
437/// The implementation is actually a bit more tricky than one might initially think.  We can't just
438/// go through and schedule each column one after the other.  This would mean our decode can't start
439/// until nearly all the data has arrived (since we need data from each column to yield a batch)
440///
441/// Instead, we schedule in row-major fashion
442///
443/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
444/// record batch as a non-nullable struct.
445#[derive(Debug)]
446pub struct StructuralStructScheduler {
447    children: Vec<Box<dyn StructuralFieldScheduler>>,
448    child_fields: Fields,
449}
450
451impl StructuralStructScheduler {
452    pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
453        debug_assert!(!children.is_empty());
454        Self {
455            children,
456            child_fields,
457        }
458    }
459}
460
461impl StructuralFieldScheduler for StructuralStructScheduler {
462    fn schedule_ranges<'a>(
463        &'a self,
464        ranges: &[Range<u64>],
465        filter: &FilterExpression,
466    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
467        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
468
469        let child_schedulers = self
470            .children
471            .iter()
472            .map(|child| child.schedule_ranges(ranges, filter))
473            .collect::<Result<Vec<_>>>()?;
474
475        Ok(Box::new(RepDefStructSchedulingJob::new(
476            self,
477            child_schedulers,
478            num_rows,
479        )))
480    }
481
482    fn initialize<'a>(
483        &'a mut self,
484        filter: &'a FilterExpression,
485        context: &'a SchedulerContext,
486    ) -> BoxFuture<'a, Result<()>> {
487        let children_initialization = self
488            .children
489            .iter_mut()
490            .map(|child| child.initialize(filter, context))
491            .collect::<FuturesUnordered<_>>();
492        async move {
493            children_initialization
494                .map(|res| res.map(|_| ()))
495                .try_collect::<Vec<_>>()
496                .await?;
497            Ok(())
498        }
499        .boxed()
500    }
501}
502
503#[derive(Debug)]
504struct ChildState {
505    // As child decoders are scheduled they are added to this queue
506    // Once the decoder is fully drained it is popped from this queue
507    //
508    // TODO: It may be a minor perf optimization, in some rare cases, if we have a separate
509    // "fully awaited but not yet drained" queue so we don't loop through fully awaited pages
510    // during each call to wait.
511    //
512    // Note: This queue may have more than one page in it if the batch size is very large
513    // or pages are very small
514    // TODO: Test this case
515    scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
516    // Rows that have been awaited
517    rows_loaded: u64,
518    // Rows that have drained
519    rows_drained: u64,
520    // Rows that have been popped (the decoder has been completely drained and removed from `scheduled`)
521    rows_popped: u64,
522    // Total number of rows in the struct
523    num_rows: u64,
524    // The field index in the struct (used for debugging / logging)
525    field_index: u32,
526}
527
528struct CompositeDecodeTask {
529    // One per child
530    tasks: Vec<Box<dyn DecodeArrayTask>>,
531    num_rows: u64,
532    has_more: bool,
533}
534
535impl CompositeDecodeTask {
536    fn decode(self) -> Result<ArrayRef> {
537        let arrays = self
538            .tasks
539            .into_iter()
540            .map(|task| task.decode())
541            .collect::<Result<Vec<_>>>()?;
542        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
543        // TODO: If this is a primitive column we should be able to avoid this
544        // allocation + copy with "page bridging" which could save us a few CPU
545        // cycles.
546        //
547        // This optimization is probably most important for super fast storage like NVME
548        // where the page size can be smaller.
549        Ok(arrow_select::concat::concat(&array_refs)?)
550    }
551}
552
553impl ChildState {
554    fn new(num_rows: u64, field_index: u32) -> Self {
555        Self {
556            scheduled: VecDeque::new(),
557            rows_loaded: 0,
558            rows_drained: 0,
559            rows_popped: 0,
560            num_rows,
561            field_index,
562        }
563    }
564
565    // Wait for the next set of rows to arrive
566    //
567    // Wait until we have at least `loaded_need` loaded and stop as soon as we
568    // go above that limit.
569    async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
570        trace!(
571            "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
572            self.field_index,
573            loaded_need,
574            self.rows_loaded,
575        );
576        let mut fully_loaded = self.rows_popped;
577        for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
578            if next_decoder.rows_unloaded() > 0 {
579                let mut current_need = loaded_need;
580                current_need -= fully_loaded;
581                let rows_in_page = next_decoder.num_rows();
582                let need_for_page = (rows_in_page - 1).min(current_need);
583                trace!(
584                    "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
585                    self.field_index,
586                    page_idx,
587                    need_for_page,
588                    rows_in_page,
589                );
590                // We might only await part of a page.  This is important for things
591                // like the struct<struct<...>> case where we have one outer page, one
592                // middle page, and then a bunch of inner pages.  If we await the entire
593                // middle page then we will have to wait for all the inner pages to arrive
594                // before we can start decoding.
595                next_decoder.wait_for_loaded(need_for_page).await?;
596                let now_loaded = next_decoder.rows_loaded();
597                fully_loaded += now_loaded;
598                trace!(
599                    "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
600                    self.field_index,
601                    page_idx,
602                    now_loaded,
603                    fully_loaded
604                );
605            } else {
606                fully_loaded += next_decoder.num_rows();
607            }
608            if fully_loaded > loaded_need {
609                break;
610            }
611        }
612        self.rows_loaded = fully_loaded;
613        trace!(
614            "Struct child {} loaded {} new rows and now {} are loaded",
615            self.field_index,
616            fully_loaded,
617            self.rows_loaded
618        );
619        Ok(())
620    }
621
622    fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
623        trace!("Struct draining {} rows", num_rows);
624
625        trace!(
626            "Draining {} rows from struct page with {} rows already drained",
627            num_rows,
628            self.rows_drained
629        );
630        let mut remaining = num_rows;
631        let mut composite = CompositeDecodeTask {
632            tasks: Vec::new(),
633            num_rows: 0,
634            has_more: true,
635        };
636        while remaining > 0 {
637            let next = self.scheduled.front_mut().unwrap();
638            let rows_to_take = remaining.min(next.rows_left());
639            let next_task = next.drain(rows_to_take)?;
640            if next.rows_left() == 0 {
641                trace!("Completely drained page");
642                self.rows_popped += next.num_rows();
643                self.scheduled.pop_front();
644            }
645            remaining -= rows_to_take;
646            composite.tasks.push(next_task.task);
647            composite.num_rows += next_task.num_rows;
648        }
649        self.rows_drained += num_rows;
650        composite.has_more = self.rows_drained != self.num_rows;
651        Ok(composite)
652    }
653}
654
655// Wrapper around ChildState that orders using rows_unawaited
656struct WaitOrder<'a>(&'a mut ChildState);
657
658impl Eq for WaitOrder<'_> {}
659impl PartialEq for WaitOrder<'_> {
660    fn eq(&self, other: &Self) -> bool {
661        self.0.rows_loaded == other.0.rows_loaded
662    }
663}
664impl Ord for WaitOrder<'_> {
665    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
666        // Note: this is inverted so we have a min-heap
667        other.0.rows_loaded.cmp(&self.0.rows_loaded)
668    }
669}
670impl PartialOrd for WaitOrder<'_> {
671    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
672        Some(self.cmp(other))
673    }
674}
675
676#[derive(Debug)]
677pub struct StructuralStructDecoder {
678    children: Vec<Box<dyn StructuralFieldDecoder>>,
679    data_type: DataType,
680    child_fields: Fields,
681    // The root decoder is slightly different because it cannot have nulls
682    is_root: bool,
683}
684
685impl StructuralStructDecoder {
686    pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
687        let children = fields
688            .iter()
689            .map(|field| Self::field_to_decoder(field, should_validate))
690            .collect();
691        let data_type = DataType::Struct(fields.clone());
692        Self {
693            data_type,
694            children,
695            child_fields: fields,
696            is_root,
697        }
698    }
699
700    fn field_to_decoder(
701        field: &Arc<arrow_schema::Field>,
702        should_validate: bool,
703    ) -> Box<dyn StructuralFieldDecoder> {
704        match field.data_type() {
705            DataType::Struct(fields) => {
706                if field.is_packed_struct() {
707                    let decoder =
708                        StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
709                    Box::new(decoder)
710                } else {
711                    Box::new(Self::new(fields.clone(), should_validate, false))
712                }
713            }
714            DataType::List(child_field) | DataType::LargeList(child_field) => {
715                let child_decoder = Self::field_to_decoder(child_field, should_validate);
716                Box::new(StructuralListDecoder::new(
717                    child_decoder,
718                    field.data_type().clone(),
719                ))
720            }
721            DataType::RunEndEncoded(_, _) => todo!(),
722            DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
723            DataType::Map(_, _) => todo!(),
724            DataType::Union(_, _) => todo!(),
725            _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
726        }
727    }
728
729    pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
730        let array_drain = self.drain(num_rows)?;
731        Ok(NextDecodeTask {
732            num_rows,
733            task: Box::new(array_drain),
734        })
735    }
736}
737
738impl StructuralFieldDecoder for StructuralStructDecoder {
739    fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
740        // children with empty path should not be delivered to this method
741        let child_idx = child.path.pop_front().unwrap();
742        // This decoder is intended for one of our children
743        self.children[child_idx as usize].accept_page(child)?;
744        Ok(())
745    }
746
747    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
748        let child_tasks = self
749            .children
750            .iter_mut()
751            .map(|child| child.drain(num_rows))
752            .collect::<Result<Vec<_>>>()?;
753        Ok(Box::new(RepDefStructDecodeTask {
754            children: child_tasks,
755            child_fields: self.child_fields.clone(),
756            is_root: self.is_root,
757        }))
758    }
759
760    fn data_type(&self) -> &DataType {
761        &self.data_type
762    }
763}
764
765#[derive(Debug)]
766struct RepDefStructDecodeTask {
767    children: Vec<Box<dyn StructuralDecodeArrayTask>>,
768    child_fields: Fields,
769    is_root: bool,
770}
771
772impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
773    fn decode(self: Box<Self>) -> Result<DecodedArray> {
774        let arrays = self
775            .children
776            .into_iter()
777            .map(|task| task.decode())
778            .collect::<Result<Vec<_>>>()?;
779        let mut children = Vec::with_capacity(arrays.len());
780        let mut arrays_iter = arrays.into_iter();
781        let first_array = arrays_iter.next().unwrap();
782        let length = first_array.array.len();
783
784        // The repdef should be identical across all children at this point
785        let mut repdef = first_array.repdef;
786        children.push(first_array.array);
787
788        for array in arrays_iter {
789            debug_assert_eq!(length, array.array.len());
790            children.push(array.array);
791        }
792
793        let validity = if self.is_root {
794            None
795        } else {
796            repdef.unravel_validity(length)
797        };
798        let array = StructArray::new(self.child_fields, children, validity);
799        Ok(DecodedArray {
800            array: Arc::new(array),
801            repdef,
802        })
803    }
804}
805
806#[derive(Debug)]
807pub struct SimpleStructDecoder {
808    children: Vec<ChildState>,
809    child_fields: Fields,
810    data_type: DataType,
811    num_rows: u64,
812}
813
814impl SimpleStructDecoder {
815    pub fn new(child_fields: Fields, num_rows: u64) -> Self {
816        let data_type = DataType::Struct(child_fields.clone());
817        Self {
818            children: child_fields
819                .iter()
820                .enumerate()
821                .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
822                .collect(),
823            child_fields,
824            data_type,
825            num_rows,
826        }
827    }
828
829    async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
830        let mut wait_orders = self
831            .children
832            .iter_mut()
833            .filter_map(|child| {
834                if child.rows_loaded <= loaded_need {
835                    Some(WaitOrder(child))
836                } else {
837                    None
838                }
839            })
840            .collect::<BinaryHeap<_>>();
841        while !wait_orders.is_empty() {
842            let next_waiter = wait_orders.pop().unwrap();
843            let next_highest = wait_orders
844                .peek()
845                .map(|w| w.0.rows_loaded)
846                .unwrap_or(u64::MAX);
847            // Wait until you have the number of rows needed, or at least more than the
848            // next highest waiter
849            let limit = loaded_need.min(next_highest);
850            next_waiter.0.wait_for_loaded(limit).await?;
851            log::trace!(
852                "Struct child {} finished await pass and now {} are loaded",
853                next_waiter.0.field_index,
854                next_waiter.0.rows_loaded
855            );
856            if next_waiter.0.rows_loaded <= loaded_need {
857                wait_orders.push(next_waiter);
858            }
859        }
860        Ok(())
861    }
862}
863
864impl LogicalPageDecoder for SimpleStructDecoder {
865    fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
866        // children with empty path should not be delivered to this method
867        let child_idx = child.path.pop_front().unwrap();
868        if child.path.is_empty() {
869            // This decoder is intended for us
870            self.children[child_idx as usize]
871                .scheduled
872                .push_back(child.decoder);
873        } else {
874            // This decoder is intended for one of our children
875            let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
876            intended.accept_child(child)?;
877        }
878        Ok(())
879    }
880
881    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
882        self.do_wait_for_loaded(loaded_need).boxed()
883    }
884
885    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
886        let child_tasks = self
887            .children
888            .iter_mut()
889            .map(|child| child.drain(num_rows))
890            .collect::<Result<Vec<_>>>()?;
891        let num_rows = child_tasks[0].num_rows;
892        debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
893        Ok(NextDecodeTask {
894            task: Box::new(SimpleStructDecodeTask {
895                children: child_tasks,
896                child_fields: self.child_fields.clone(),
897            }),
898            num_rows,
899        })
900    }
901
902    fn rows_loaded(&self) -> u64 {
903        self.children.iter().map(|c| c.rows_loaded).min().unwrap()
904    }
905
906    fn rows_drained(&self) -> u64 {
907        // All children should have the same number of rows drained
908        debug_assert!(self
909            .children
910            .iter()
911            .all(|c| c.rows_drained == self.children[0].rows_drained));
912        self.children[0].rows_drained
913    }
914
915    fn num_rows(&self) -> u64 {
916        self.num_rows
917    }
918
919    fn data_type(&self) -> &DataType {
920        &self.data_type
921    }
922}
923
924struct SimpleStructDecodeTask {
925    children: Vec<CompositeDecodeTask>,
926    child_fields: Fields,
927}
928
929impl DecodeArrayTask for SimpleStructDecodeTask {
930    fn decode(self: Box<Self>) -> Result<ArrayRef> {
931        let child_arrays = self
932            .children
933            .into_iter()
934            .map(|child| child.decode())
935            .collect::<Result<Vec<_>>>()?;
936        Ok(Arc::new(StructArray::try_new(
937            self.child_fields,
938            child_arrays,
939            None,
940        )?))
941    }
942}
943
944/// A structural encoder for struct fields
945///
946/// The struct's validity is added to the rep/def builder
947/// and the builder is cloned to all children.
948pub struct StructStructuralEncoder {
949    children: Vec<Box<dyn FieldEncoder>>,
950}
951
952impl StructStructuralEncoder {
953    pub fn new(children: Vec<Box<dyn FieldEncoder>>) -> Self {
954        Self { children }
955    }
956}
957
958impl FieldEncoder for StructStructuralEncoder {
959    fn maybe_encode(
960        &mut self,
961        array: ArrayRef,
962        external_buffers: &mut OutOfLineBuffers,
963        mut repdef: RepDefBuilder,
964        row_number: u64,
965        num_rows: u64,
966    ) -> Result<Vec<EncodeTask>> {
967        let struct_array = array.as_struct();
968        if let Some(validity) = struct_array.nulls() {
969            repdef.add_validity_bitmap(validity.clone());
970        } else {
971            repdef.add_no_null(struct_array.len());
972        }
973        let child_tasks = self
974            .children
975            .iter_mut()
976            .zip(struct_array.columns().iter())
977            .map(|(encoder, arr)| {
978                encoder.maybe_encode(
979                    arr.clone(),
980                    external_buffers,
981                    repdef.clone(),
982                    row_number,
983                    num_rows,
984                )
985            })
986            .collect::<Result<Vec<_>>>()?;
987        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
988    }
989
990    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
991        self.children
992            .iter_mut()
993            .map(|encoder| encoder.flush(external_buffers))
994            .flatten_ok()
995            .collect::<Result<Vec<_>>>()
996    }
997
998    fn num_columns(&self) -> u32 {
999        self.children
1000            .iter()
1001            .map(|child| child.num_columns())
1002            .sum::<u32>()
1003    }
1004
1005    fn finish(
1006        &mut self,
1007        external_buffers: &mut OutOfLineBuffers,
1008    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1009        let mut child_columns = self
1010            .children
1011            .iter_mut()
1012            .map(|child| child.finish(external_buffers))
1013            .collect::<FuturesOrdered<_>>();
1014        async move {
1015            let mut encoded_columns = Vec::with_capacity(child_columns.len());
1016            while let Some(child_cols) = child_columns.next().await {
1017                encoded_columns.extend(child_cols?);
1018            }
1019            Ok(encoded_columns)
1020        }
1021        .boxed()
1022    }
1023}
1024
1025pub struct StructFieldEncoder {
1026    children: Vec<Box<dyn FieldEncoder>>,
1027    column_index: u32,
1028    num_rows_seen: u64,
1029}
1030
1031impl StructFieldEncoder {
1032    #[allow(dead_code)]
1033    pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
1034        Self {
1035            children,
1036            column_index,
1037            num_rows_seen: 0,
1038        }
1039    }
1040}
1041
1042impl FieldEncoder for StructFieldEncoder {
1043    fn maybe_encode(
1044        &mut self,
1045        array: ArrayRef,
1046        external_buffers: &mut OutOfLineBuffers,
1047        repdef: RepDefBuilder,
1048        row_number: u64,
1049        num_rows: u64,
1050    ) -> Result<Vec<EncodeTask>> {
1051        self.num_rows_seen += array.len() as u64;
1052        let struct_array = array.as_struct();
1053        let child_tasks = self
1054            .children
1055            .iter_mut()
1056            .zip(struct_array.columns().iter())
1057            .map(|(encoder, arr)| {
1058                encoder.maybe_encode(
1059                    arr.clone(),
1060                    external_buffers,
1061                    repdef.clone(),
1062                    row_number,
1063                    num_rows,
1064                )
1065            })
1066            .collect::<Result<Vec<_>>>()?;
1067        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
1068    }
1069
1070    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1071        let child_tasks = self
1072            .children
1073            .iter_mut()
1074            .map(|encoder| encoder.flush(external_buffers))
1075            .collect::<Result<Vec<_>>>()?;
1076        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
1077    }
1078
1079    fn num_columns(&self) -> u32 {
1080        self.children
1081            .iter()
1082            .map(|child| child.num_columns())
1083            .sum::<u32>()
1084            + 1
1085    }
1086
1087    fn finish(
1088        &mut self,
1089        external_buffers: &mut OutOfLineBuffers,
1090    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1091        let mut child_columns = self
1092            .children
1093            .iter_mut()
1094            .map(|child| child.finish(external_buffers))
1095            .collect::<FuturesOrdered<_>>();
1096        let num_rows_seen = self.num_rows_seen;
1097        let column_index = self.column_index;
1098        async move {
1099            let mut columns = Vec::new();
1100            // Add a column for the struct header
1101            let mut header = EncodedColumn::default();
1102            header.final_pages.push(EncodedPage {
1103                data: Vec::new(),
1104                description: PageEncoding::Legacy(pb::ArrayEncoding {
1105                    array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1106                        pb::SimpleStruct {},
1107                    )),
1108                }),
1109                num_rows: num_rows_seen,
1110                column_idx: column_index,
1111                row_number: 0, // Not used by legacy encoding
1112            });
1113            columns.push(header);
1114            // Now run finish on the children
1115            while let Some(child_cols) = child_columns.next().await {
1116                columns.extend(child_cols?);
1117            }
1118            Ok(columns)
1119        }
1120        .boxed()
1121    }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126
1127    use std::{collections::HashMap, sync::Arc};
1128
1129    use arrow_array::{
1130        builder::{Int32Builder, ListBuilder},
1131        Array, ArrayRef, Int32Array, StructArray,
1132    };
1133    use arrow_buffer::NullBuffer;
1134    use arrow_schema::{DataType, Field, Fields};
1135
1136    use crate::{
1137        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1138        version::LanceFileVersion,
1139    };
1140
1141    #[test_log::test(tokio::test)]
1142    async fn test_simple_struct() {
1143        let data_type = DataType::Struct(Fields::from(vec![
1144            Field::new("a", DataType::Int32, false),
1145            Field::new("b", DataType::Int32, false),
1146        ]));
1147        let field = Field::new("", data_type, false);
1148        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1149    }
1150
1151    #[test_log::test(tokio::test)]
1152    async fn test_nullable_struct() {
1153        // Test data struct<score: int32, location: struct<x: int32, y: int32>>
1154        // - score: null
1155        //   location:
1156        //     x: 1
1157        //     y: 6
1158        // - score: 12
1159        //   location:
1160        //     x: 2
1161        //     y: null
1162        // - score: 13
1163        //   location:
1164        //     x: 3
1165        //     y: 8
1166        // - score: 14
1167        //   location: null
1168        // - null
1169        //
1170        let inner_fields = Fields::from(vec![
1171            Field::new("x", DataType::Int32, false),
1172            Field::new("y", DataType::Int32, true),
1173        ]);
1174        let inner_struct = DataType::Struct(inner_fields.clone());
1175        let outer_fields = Fields::from(vec![
1176            Field::new("score", DataType::Int32, true),
1177            Field::new("location", inner_struct, true),
1178        ]);
1179
1180        let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
1181        let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
1182        let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
1183
1184        let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
1185        let locations = StructArray::new(
1186            inner_fields,
1187            vec![Arc::new(x_vals), Arc::new(y_vals)],
1188            Some(location_validity),
1189        );
1190
1191        let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
1192        let rows = StructArray::new(
1193            outer_fields,
1194            vec![Arc::new(scores), Arc::new(locations)],
1195            Some(rows_validity),
1196        );
1197
1198        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1199
1200        check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
1201    }
1202
1203    #[test_log::test(tokio::test)]
1204    async fn test_struct_list() {
1205        let data_type = DataType::Struct(Fields::from(vec![
1206            Field::new(
1207                "inner_list",
1208                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1209                true,
1210            ),
1211            Field::new("outer_int", DataType::Int32, true),
1212        ]));
1213        let field = Field::new("row", data_type, false);
1214        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1215    }
1216
1217    #[test_log::test(tokio::test)]
1218    async fn test_empty_struct() {
1219        // It's technically legal for a struct to have 0 children, need to
1220        // make sure we support that
1221        let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
1222        let field = Field::new("row", data_type, false);
1223        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1224    }
1225
1226    #[test_log::test(tokio::test)]
1227    async fn test_complicated_struct() {
1228        let data_type = DataType::Struct(Fields::from(vec![
1229            Field::new("int", DataType::Int32, true),
1230            Field::new(
1231                "inner",
1232                DataType::Struct(Fields::from(vec![
1233                    Field::new("inner_int", DataType::Int32, true),
1234                    Field::new(
1235                        "inner_list",
1236                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1237                        true,
1238                    ),
1239                ])),
1240                true,
1241            ),
1242            Field::new("outer_binary", DataType::Binary, true),
1243        ]));
1244        let field = Field::new("row", data_type, false);
1245        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1246    }
1247
1248    #[test_log::test(tokio::test)]
1249    async fn test_ragged_scheduling() {
1250        // This test covers scheduling when batches straddle page boundaries
1251
1252        // Create a list with 10k nulls
1253        let items_builder = Int32Builder::new();
1254        let mut list_builder = ListBuilder::new(items_builder);
1255        for _ in 0..10000 {
1256            list_builder.append_null();
1257        }
1258        let list_array = Arc::new(list_builder.finish());
1259        let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
1260        let fields = vec![
1261            Field::new("", list_array.data_type().clone(), true),
1262            Field::new("", int_array.data_type().clone(), true),
1263        ];
1264        let struct_array = Arc::new(StructArray::new(
1265            Fields::from(fields),
1266            vec![list_array, int_array],
1267            None,
1268        )) as ArrayRef;
1269        let struct_arrays = (0..10000)
1270            // Intentionally skip in some randomish amount to create more ragged scheduling
1271            .step_by(437)
1272            .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
1273            .collect::<Vec<_>>();
1274        check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
1275            .await;
1276    }
1277}