lance_encoding/encodings/logical/
list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{
7    cast::AsArray,
8    new_empty_array,
9    types::{Int32Type, Int64Type, UInt64Type},
10    Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
11};
12use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
13use arrow_schema::{DataType, Field, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_arrow::list::ListArrayExt;
16use log::trace;
17use snafu::location;
18use tokio::task::JoinHandle;
19
20use lance_core::{cache::FileMetadataCache, Error, Result};
21
22use crate::{
23    buffer::LanceBuffer,
24    data::{BlockInfo, DataBlock, FixedWidthDataBlock},
25    decoder::{
26        DecodeArrayTask, DecodeBatchScheduler, DecodedArray, FieldScheduler, FilterExpression,
27        ListPriorityRange, LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding,
28        PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
29        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
30        StructuralSchedulingJob,
31    },
32    encoder::{
33        ArrayEncoder, EncodeTask, EncodedArray, EncodedColumn, EncodedPage, FieldEncoder,
34        OutOfLineBuffers,
35    },
36    encodings::logical::r#struct::SimpleStructScheduler,
37    format::pb,
38    repdef::RepDefBuilder,
39    EncodingsIo,
40};
41
42use super::{primitive::AccumulationQueue, r#struct::SimpleStructDecoder};
43
44// Scheduling lists is tricky.  Imagine the following scenario:
45//
46// * There are 2000 offsets per offsets page
47// * The user requests range 8000..8500
48//
49// First, since 8000 matches the start of an offsets page, we don't need to read an extra offset.
50//
51// Since this range matches the start of a page, we know we will get an offsets array like
52// [0, ...]
53//
54// We need to restore nulls, which relies on a null offset adjustment, which is unique to each offsets
55// page.
56//
57// We need to map this to [X, ...] where X is the sum of the number of items in the 0-2000, 2000-4000,
58// and 4000-6000 pages.
59//
60// This gets even trickier if a range spans multiple offsets pages.  For example, given the same
61// scenario but the user requests 7999..8500.  In this case the first page read will include an
62// extra offset (e.g. we need to read 7998..8000), the null adjustment will be different between the
63// two, and the items offset will be different.
64//
65// To handle this, we take the incoming row requests, look at the page info, and then calculate
66// list requests.
67
68#[derive(Debug)]
69struct ListRequest {
70    /// How many lists this request maps to
71    num_lists: u64,
72    /// Did this request include an extra offset
73    includes_extra_offset: bool,
74    /// The null offset adjustment for this request
75    null_offset_adjustment: u64,
76    /// items offset to apply
77    items_offset: u64,
78}
79
80#[derive(Debug)]
81struct ListRequestsIter {
82    // The bool triggers whether we need to skip an offset or not
83    list_requests: VecDeque<ListRequest>,
84    offsets_requests: Vec<Range<u64>>,
85}
86
87impl ListRequestsIter {
88    // TODO: This logic relies on row_ranges being ordered and may be a problem when we
89    // add proper support for out-of-order take
90    fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
91        let mut items_offset = 0;
92        let mut offsets_offset = 0;
93        let mut page_infos_iter = page_infos.iter();
94        let mut cur_page_info = page_infos_iter.next().unwrap();
95        let mut list_requests = VecDeque::new();
96        let mut offsets_requests = Vec::new();
97
98        // Each row range maps to at least one list request.  It may map to more if the
99        // range spans multiple offsets pages.
100        for range in row_ranges {
101            let mut range = range.clone();
102
103            // Skip any offsets pages that are before the range
104            while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
105                trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
106                offsets_offset += cur_page_info.offsets_in_page;
107                items_offset += cur_page_info.num_items_referenced_by_page;
108                cur_page_info = page_infos_iter.next().unwrap();
109            }
110
111            // If the range starts at the beginning of an offsets page we don't need
112            // to read an extra offset
113            let mut includes_extra_offset = range.start != offsets_offset;
114            if includes_extra_offset {
115                offsets_requests.push(range.start - 1..range.end);
116            } else {
117                offsets_requests.push(range.clone());
118            }
119
120            // At this point our range overlaps the current page (cur_page_info) and
121            // we can start slicing it into list requests
122            while !range.is_empty() {
123                // The end of the list request is the min of the end of the range
124                // and the end of the current page
125                let end = offsets_offset + cur_page_info.offsets_in_page;
126                let last = end >= range.end;
127                let end = end.min(range.end);
128                list_requests.push_back(ListRequest {
129                    num_lists: end - range.start,
130                    includes_extra_offset,
131                    null_offset_adjustment: cur_page_info.null_offset_adjustment,
132                    items_offset,
133                });
134
135                includes_extra_offset = false;
136                range.start = end;
137                // If there is still more data in the range, we need to move to the
138                // next page
139                if !last {
140                    offsets_offset += cur_page_info.offsets_in_page;
141                    items_offset += cur_page_info.num_items_referenced_by_page;
142                    cur_page_info = page_infos_iter.next().unwrap();
143                }
144            }
145        }
146        Self {
147            list_requests,
148            offsets_requests,
149        }
150    }
151
152    // Given a page of offset data, grab the corresponding list requests
153    fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
154        let mut list_requests = Vec::new();
155        while num_offsets > 0 {
156            let req = self.list_requests.front_mut().unwrap();
157            // If the request did not start at zero then we need to read an extra offset
158            if req.includes_extra_offset {
159                num_offsets -= 1;
160                debug_assert_ne!(num_offsets, 0);
161            }
162            if num_offsets >= req.num_lists {
163                num_offsets -= req.num_lists;
164                list_requests.push(self.list_requests.pop_front().unwrap());
165            } else {
166                let sub_req = ListRequest {
167                    num_lists: num_offsets,
168                    includes_extra_offset: req.includes_extra_offset,
169                    null_offset_adjustment: req.null_offset_adjustment,
170                    items_offset: req.items_offset,
171                };
172
173                list_requests.push(sub_req);
174                req.includes_extra_offset = false;
175                req.num_lists -= num_offsets;
176                num_offsets = 0;
177            }
178        }
179        list_requests
180    }
181}
182
183/// Given a list of offsets and a list of requested list row ranges we need to rewrite the offsets so that
184/// they appear as expected for a list array.  This involves a number of tasks:
185///
186///  * Nulls in the offsets are represented by oversize values and these need to be converted to
187///    the appropriate length
188///  * For each range we (usually) load N + 1 offsets, so if we have 5 ranges we have 5 extra values
189///    and we need to drop 4 of those.
190///  * Ranges may not start at 0 and, while we don't strictly need to, we want to go ahead and normalize
191///    the offsets so that the first offset is 0.
192///
193/// Throughout the comments we will consider the following example case:
194///
195/// The user requests the following ranges of lists (list_row_ranges): [0..3, 5..6]
196///
197/// This is a total of 4 lists.  The loaded offsets are [10, 20, 120, 150, 60].  The last valid offset is 99.
198/// The null_offset_adjustment will be 100.
199///
200/// Our desired output offsets are going to be [0, 10, 20, 20, 30] and the item ranges are [0..20] and [50..60]
201/// The validity array is [true, true, false, true]
202fn decode_offsets(
203    offsets: &dyn Array,
204    list_requests: &[ListRequest],
205    null_offset_adjustment: u64,
206) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
207    // In our example this is [10, 20, 120, 50, 60]
208    let numeric_offsets = offsets.as_primitive::<UInt64Type>();
209    // In our example there are 4 total lists
210    let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
211    let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
212    let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
213    // The first output offset is always 0 no matter what
214    normalized_offsets.push(0);
215    let mut last_normalized_offset = 0;
216    let offsets_values = numeric_offsets.values();
217
218    let mut item_ranges = VecDeque::new();
219    let mut offsets_offset: u32 = 0;
220    // All ranges should be non-empty
221    debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
222    for req in list_requests {
223        // The # of lists in this particular range
224        let num_lists = req.num_lists;
225
226        // Because we know the first offset is always 0 we don't store that.  This means we have special
227        // logic if a range starts at 0 (we didn't need to read an extra offset value in that case)
228        // In our example we enter this special case on the first range (0..3) but not the second (5..6)
229        // This means the first range, which has 3 lists, maps to 3 values in our offsets array [10, 20, 120]
230        // However, the second range, which has 1 list, maps to 2 values in our offsets array [150, 60]
231        let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
232            if !req.includes_extra_offset {
233                // In our example items start is 0 and items_end is 20
234                let first_offset_idx = 0_usize;
235                let num_offsets = num_lists as usize;
236                let items_start = 0;
237                let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
238                let items_range = items_start..items_end;
239                (items_range, first_offset_idx, num_offsets)
240            } else {
241                // In our example, offsets_offset will be 3, items_start will be 50, and items_end will
242                // be 60
243                let first_offset_idx = offsets_offset as usize;
244                let num_offsets = num_lists as usize + 1;
245                let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
246                let items_end =
247                    offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
248                let items_range = items_start..items_end;
249                (items_range, first_offset_idx, num_offsets)
250            };
251
252        // TODO: Maybe consider writing whether there are nulls or not as part of the
253        // page description.  Then we can skip all validity work.  Not clear if that will
254        // be any benefit though.
255
256        // We calculate validity from all elements but the first (or all elements
257        // if this is the special zero-start case)
258        //
259        // So, in our first pass through, we consider [10, 20, 120] (1 null)
260        // In our second pass through we only consider [60] (0 nulls)
261        // Note that the 150 is null but we only loaded it to know where the 50-60 list started
262        // and it doesn't actually correspond to a list (e.g. list 4 is null but we aren't loading it
263        // here)
264        let validity_start = if !req.includes_extra_offset {
265            0
266        } else {
267            offsets_to_norm_start + 1
268        };
269        for off in offsets_values
270            .slice(validity_start, num_lists as usize)
271            .iter()
272        {
273            validity_buffer.append(*off < null_offset_adjustment);
274        }
275
276        // In our special case we need to account for the offset 0-first_item
277        if !req.includes_extra_offset {
278            let first_item = offsets_values[0] % null_offset_adjustment;
279            normalized_offsets.push(first_item);
280            last_normalized_offset = first_item;
281        }
282
283        // Finally, we go through and shift the offsets.  If we just returned them as is (taking care of
284        // nulls) we would get [0, 10, 20, 20, 60] but our last list only has 10 items, not 40 and so we
285        // need to shift that 60 to a 40.
286        normalized_offsets.extend(
287                offsets_values
288                    .slice(offsets_to_norm_start, num_offsets_to_norm)
289                    .windows(2)
290                    .map(|w| {
291                        let start = w[0] % null_offset_adjustment;
292                        let end = w[1] % null_offset_adjustment;
293                        if end < start {
294                            panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
295                        }
296                        let length = end - start;
297                        last_normalized_offset += length;
298                        last_normalized_offset
299                    }),
300            );
301        trace!(
302            "List offsets range of {} lists maps to item range {:?}",
303            num_lists,
304            items_range
305        );
306        offsets_offset += num_offsets_to_norm as u32;
307        if !items_range.is_empty() {
308            let items_range =
309                items_range.start + req.items_offset..items_range.end + req.items_offset;
310            item_ranges.push_back(items_range);
311        }
312    }
313
314    let validity = validity_buffer.finish();
315    (item_ranges, normalized_offsets, validity)
316}
317
318/// After scheduling the offsets we immediately launch this task as a new tokio task
319/// This task waits for the offsets to arrive, decodes them, and then schedules the I/O
320/// for the items.
321///
322/// This task does not wait for the items data.  That happens on the main decode loop (unless
323/// we have list of list of ... in which case it happens in the outer indirect decode loop)
324#[allow(clippy::too_many_arguments)]
325async fn indirect_schedule_task(
326    mut offsets_decoder: Box<dyn LogicalPageDecoder>,
327    list_requests: Vec<ListRequest>,
328    null_offset_adjustment: u64,
329    items_scheduler: Arc<dyn FieldScheduler>,
330    items_type: DataType,
331    io: Arc<dyn EncodingsIo>,
332    cache: Arc<FileMetadataCache>,
333    priority: Box<dyn PriorityRange>,
334) -> Result<IndirectlyLoaded> {
335    let num_offsets = offsets_decoder.num_rows();
336    // We know the offsets are a primitive array and thus will not need additional
337    // pages.  We can use a dummy receiver to match the decoder API
338    offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
339    let decode_task = offsets_decoder.drain(num_offsets)?;
340    let offsets = decode_task.task.decode()?;
341
342    let (item_ranges, offsets, validity) =
343        decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
344
345    trace!(
346        "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
347        item_ranges,
348        items_scheduler.num_rows(),
349        priority
350    );
351    let offsets: Arc<[u64]> = offsets.into();
352
353    // All requested lists are empty
354    if item_ranges.is_empty() {
355        debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
356        return Ok(IndirectlyLoaded {
357            root_decoder: None,
358            offsets,
359            validity,
360        });
361    }
362    let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
363    let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
364
365    // Create a new root scheduler, which has one column, which is our items data
366    let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
367    let indirect_root_scheduler =
368        SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
369    let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
370        Arc::new(indirect_root_scheduler),
371        root_fields.clone(),
372        cache,
373    );
374    let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
375
376    let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
377
378    let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
379        &item_ranges,
380        // Can't push filters into list items
381        &FilterExpression::no_filter(),
382        io,
383        Some(priority),
384    )?;
385
386    for message in indirect_messages {
387        for decoder in message.decoders {
388            let decoder = decoder.into_legacy();
389            if !decoder.path.is_empty() {
390                root_decoder.accept_child(decoder)?;
391            }
392        }
393    }
394
395    Ok(IndirectlyLoaded {
396        offsets,
397        validity,
398        root_decoder: Some(root_decoder),
399    })
400}
401
402#[derive(Debug)]
403struct ListFieldSchedulingJob<'a> {
404    scheduler: &'a ListFieldScheduler,
405    offsets: Box<dyn SchedulingJob + 'a>,
406    num_rows: u64,
407    list_requests_iter: ListRequestsIter,
408}
409
410impl<'a> ListFieldSchedulingJob<'a> {
411    fn try_new(
412        scheduler: &'a ListFieldScheduler,
413        ranges: &[Range<u64>],
414        filter: &FilterExpression,
415    ) -> Result<Self> {
416        let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
417        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
418        let offsets = scheduler
419            .offsets_scheduler
420            .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
421        Ok(Self {
422            scheduler,
423            offsets,
424            list_requests_iter,
425            num_rows,
426        })
427    }
428}
429
430impl SchedulingJob for ListFieldSchedulingJob<'_> {
431    fn schedule_next(
432        &mut self,
433        context: &mut SchedulerContext,
434        priority: &dyn PriorityRange,
435    ) -> Result<ScheduledScanLine> {
436        let next_offsets = self.offsets.schedule_next(context, priority)?;
437        let offsets_scheduled = next_offsets.rows_scheduled;
438        let list_reqs = self.list_requests_iter.next(offsets_scheduled);
439        trace!(
440            "Scheduled {} offsets which maps to list requests: {:?}",
441            offsets_scheduled,
442            list_reqs
443        );
444        let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
445        // It shouldn't be possible for `list_reqs` to span more than one offsets page and so it shouldn't
446        // be possible for the null_offset_adjustment to change
447        debug_assert!(list_reqs
448            .iter()
449            .all(|req| req.null_offset_adjustment == null_offset_adjustment));
450        let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
451        // offsets is a uint64 which is guaranteed to create one decoder on each call to schedule_next
452        let next_offsets_decoder = next_offsets
453            .decoders
454            .into_iter()
455            .next()
456            .unwrap()
457            .into_legacy()
458            .decoder;
459
460        let items_scheduler = self.scheduler.items_scheduler.clone();
461        let items_type = self.scheduler.items_field.data_type().clone();
462        let io = context.io().clone();
463        let cache = context.cache().clone();
464
465        // Immediately spawn the indirect scheduling
466        let indirect_fut = tokio::spawn(indirect_schedule_task(
467            next_offsets_decoder,
468            list_reqs,
469            null_offset_adjustment,
470            items_scheduler,
471            items_type,
472            io,
473            cache,
474            priority.box_clone(),
475        ));
476
477        // Return a decoder
478        let decoder = Box::new(ListPageDecoder {
479            offsets: Arc::new([]),
480            validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
481            item_decoder: None,
482            rows_drained: 0,
483            rows_loaded: 0,
484            items_field: self.scheduler.items_field.clone(),
485            num_rows,
486            unloaded: Some(indirect_fut),
487            offset_type: self.scheduler.offset_type.clone(),
488            data_type: self.scheduler.list_type.clone(),
489        });
490        let decoder = context.locate_decoder(decoder);
491        Ok(ScheduledScanLine {
492            decoders: vec![MessageType::DecoderReady(decoder)],
493            rows_scheduled: num_rows,
494        })
495    }
496
497    fn num_rows(&self) -> u64 {
498        self.num_rows
499    }
500}
501
502/// A page scheduler for list fields that encodes offsets in one field and items in another
503///
504/// The list scheduler is somewhat unique because it requires indirect I/O.  We cannot know the
505/// ranges we need simply by looking at the metadata.  This means that list scheduling doesn't
506/// fit neatly into the two-thread schedule-loop / decode-loop model.  To handle this, when a
507/// list page is scheduled, we only schedule the I/O for the offsets and then we immediately
508/// launch a new tokio task.  This new task waits for the offsets, decodes them, and then
509/// schedules the I/O for the items.  Keep in mind that list items can be lists themselves.  If
510/// that is the case then this indirection will continue.  The decode task that is returned will
511/// only finish `wait`ing when all of the I/O has completed.
512///
513/// Whenever we schedule follow-up I/O like this the priority is based on the top-level row
514/// index.  This helps ensure that earlier rows get finished completely (including follow up
515/// tasks) before we perform I/O for later rows.
516#[derive(Debug)]
517pub struct ListFieldScheduler {
518    offsets_scheduler: Arc<dyn FieldScheduler>,
519    items_scheduler: Arc<dyn FieldScheduler>,
520    items_field: Arc<Field>,
521    offset_type: DataType,
522    list_type: DataType,
523    offset_page_info: Vec<OffsetPageInfo>,
524}
525
526/// The offsets are stored in a uint64 encoded column.  For each page we
527/// store some supplementary data that helps us understand the offsets.
528/// This is needed to construct the scheduler
529#[derive(Debug)]
530pub struct OffsetPageInfo {
531    pub offsets_in_page: u64,
532    pub null_offset_adjustment: u64,
533    pub num_items_referenced_by_page: u64,
534}
535
536impl ListFieldScheduler {
537    // Create a new ListPageScheduler
538    pub fn new(
539        offsets_scheduler: Arc<dyn FieldScheduler>,
540        items_scheduler: Arc<dyn FieldScheduler>,
541        items_field: Arc<Field>,
542        // Should be int32 or int64
543        offset_type: DataType,
544        offset_page_info: Vec<OffsetPageInfo>,
545    ) -> Self {
546        let list_type = match &offset_type {
547            DataType::Int32 => DataType::List(items_field.clone()),
548            DataType::Int64 => DataType::LargeList(items_field.clone()),
549            _ => panic!("Unexpected offset type {}", offset_type),
550        };
551        Self {
552            offsets_scheduler,
553            items_scheduler,
554            items_field,
555            offset_type,
556            offset_page_info,
557            list_type,
558        }
559    }
560}
561
562impl FieldScheduler for ListFieldScheduler {
563    fn schedule_ranges<'a>(
564        &'a self,
565        ranges: &[Range<u64>],
566        filter: &FilterExpression,
567    ) -> Result<Box<dyn SchedulingJob + 'a>> {
568        Ok(Box::new(ListFieldSchedulingJob::try_new(
569            self, ranges, filter,
570        )?))
571    }
572
573    fn num_rows(&self) -> u64 {
574        self.offsets_scheduler.num_rows()
575    }
576
577    fn initialize<'a>(
578        &'a self,
579        _filter: &'a FilterExpression,
580        _context: &'a SchedulerContext,
581    ) -> BoxFuture<'a, Result<()>> {
582        // 2.0 schedulers do not need to initialize
583        std::future::ready(Ok(())).boxed()
584    }
585}
586
587/// As soon as the first call to decode comes in we wait for all indirect I/O to
588/// complete.
589///
590/// Once the indirect I/O is finished we pull items out of `unawaited`, wait them
591/// (this wait should return immediately) and then push them into `item_decoders`.
592///
593/// We then drain from `item_decoders`, popping item pages off as we finish with
594/// them.
595///
596/// TODO: Test the case where a single list page has multiple items pages
597#[derive(Debug)]
598struct ListPageDecoder {
599    unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
600    // offsets and validity will have already been decoded as part of the indirect I/O
601    offsets: Arc<[u64]>,
602    validity: BooleanBuffer,
603    item_decoder: Option<SimpleStructDecoder>,
604    num_rows: u64,
605    rows_drained: u64,
606    rows_loaded: u64,
607    items_field: Arc<Field>,
608    offset_type: DataType,
609    data_type: DataType,
610}
611
612struct ListDecodeTask {
613    offsets: Vec<u64>,
614    validity: BooleanBuffer,
615    // Will be None if there are no items (all empty / null lists)
616    items: Option<Box<dyn DecodeArrayTask>>,
617    items_field: Arc<Field>,
618    offset_type: DataType,
619}
620
621impl DecodeArrayTask for ListDecodeTask {
622    fn decode(self: Box<Self>) -> Result<ArrayRef> {
623        let items = self
624            .items
625            .map(|items| {
626                // When we run the indirect I/O we wrap things in a struct array with a single field
627                // named "item".  We can unwrap that now.
628                let wrapped_items = items.decode()?;
629                Result::Ok(wrapped_items.as_struct().column(0).clone())
630            })
631            .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
632
633        // The offsets are already decoded but they need to be shifted back to 0 and cast
634        // to the appropriate type
635        //
636        // Although, in some cases, the shift IS strictly required since the unshifted offsets
637        // may cross i32::MAX even though the shifted offsets do not
638        let offsets = UInt64Array::from(self.offsets);
639        let validity = NullBuffer::new(self.validity);
640        let validity = if validity.null_count() == 0 {
641            None
642        } else {
643            Some(validity)
644        };
645        let min_offset = UInt64Array::new_scalar(offsets.value(0));
646        let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
647        match &self.offset_type {
648            DataType::Int32 => {
649                let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
650                let offsets_i32 = offsets.as_primitive::<Int32Type>();
651                let offsets = OffsetBuffer::new(offsets_i32.values().clone());
652
653                Ok(Arc::new(ListArray::try_new(
654                    self.items_field.clone(),
655                    offsets,
656                    items,
657                    validity,
658                )?))
659            }
660            DataType::Int64 => {
661                let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
662                let offsets_i64 = offsets.as_primitive::<Int64Type>();
663                let offsets = OffsetBuffer::new(offsets_i64.values().clone());
664
665                Ok(Arc::new(LargeListArray::try_new(
666                    self.items_field.clone(),
667                    offsets,
668                    items,
669                    validity,
670                )?))
671            }
672            _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
673        }
674    }
675}
676
677// Helper method that performs binary search.  However, once the
678// target is found it walks past any duplicates.  E.g. if the
679// input list is [0, 3, 5, 5, 5, 7] then this will only return
680// 0, 1, 4, or 5.
681fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
682    let mut result = match to_search.binary_search(&target) {
683        Ok(idx) => idx,
684        Err(idx) => idx - 1,
685    };
686    while result < (to_search.len() - 1) && to_search[result + 1] == target {
687        result += 1;
688    }
689    result as u64
690}
691
692impl LogicalPageDecoder for ListPageDecoder {
693    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
694        async move {
695            // wait for the indirect I/O to finish, run the scheduler for the indirect
696            // I/O and then wait for enough items to arrive
697            if self.unloaded.is_some() {
698                trace!("List scheduler needs to wait for indirect I/O to complete");
699                let indirectly_loaded = self.unloaded.take().unwrap().await;
700                if indirectly_loaded.is_err() {
701                    match indirectly_loaded.unwrap_err().try_into_panic() {
702                        Ok(err) => std::panic::resume_unwind(err),
703                        Err(err) => panic!("{:?}", err),
704                    };
705                }
706                let indirectly_loaded = indirectly_loaded.unwrap()?;
707
708                self.offsets = indirectly_loaded.offsets;
709                self.validity = indirectly_loaded.validity;
710                self.item_decoder = indirectly_loaded.root_decoder;
711            }
712            if self.rows_loaded > loaded_need {
713                return Ok(());
714            }
715
716            let boundary = loaded_need as usize;
717            debug_assert!(boundary < self.num_rows as usize);
718            // We need more than X lists which means we need at least X+1 lists which means
719            // we need at least offsets[X+1] items which means we need more than offsets[X+1]-1 items.
720            let items_needed = self.offsets[boundary + 1].saturating_sub(1);
721            trace!(
722                "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded.  To satisfy this we need more than {} loaded items",
723                loaded_need,
724                self.rows_loaded,
725                self.num_rows,
726                items_needed,
727            );
728
729            let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
730                item_decoder.wait_for_loaded(items_needed).await?;
731                item_decoder.rows_loaded()
732            } else {
733                0
734            };
735
736            self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
737            trace!("List decoder now has {} loaded rows", self.rows_loaded);
738
739            Ok(())
740        }
741        .boxed()
742    }
743
744    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
745        // We already have the offsets but need to drain the item pages
746        let mut actual_num_rows = num_rows;
747        let item_start = self.offsets[self.rows_drained as usize];
748        if self.offset_type != DataType::Int64 {
749            // We might not be able to drain `num_rows` because that request might contain more than 2^31 items
750            // so we need to figure out how many rows we can actually drain.
751            while actual_num_rows > 0 {
752                let num_items =
753                    self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
754                if num_items <= i32::MAX as u64 {
755                    break;
756                }
757                // TODO: This could be slow.  Maybe faster to start from zero or do binary search.  Investigate when
758                // actually adding support for smaller than requested batches
759                actual_num_rows -= 1;
760            }
761        }
762        if actual_num_rows < num_rows {
763            // TODO: We should be able to automatically
764            // shrink the read batch size if we detect the batches are going to be huge (maybe
765            // even achieve this with a read_batch_bytes parameter, though some estimation may
766            // still be required)
767            return Err(Error::NotSupported { source: format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into(), location: location!() });
768        }
769        let offsets = self.offsets
770            [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
771            .to_vec();
772        let validity = self
773            .validity
774            .slice(self.rows_drained as usize, actual_num_rows as usize);
775        let start = offsets[0];
776        let end = offsets[offsets.len() - 1];
777        let num_items_to_drain = end - start;
778
779        let item_decode = if num_items_to_drain == 0 {
780            None
781        } else {
782            self.item_decoder
783                .as_mut()
784                .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
785                .transpose()?
786        };
787
788        self.rows_drained += num_rows;
789        Ok(NextDecodeTask {
790            num_rows,
791            task: Box::new(ListDecodeTask {
792                offsets,
793                validity,
794                items_field: self.items_field.clone(),
795                items: item_decode,
796                offset_type: self.offset_type.clone(),
797            }) as Box<dyn DecodeArrayTask>,
798        })
799    }
800
801    fn num_rows(&self) -> u64 {
802        self.num_rows
803    }
804
805    fn rows_loaded(&self) -> u64 {
806        self.rows_loaded
807    }
808
809    fn rows_drained(&self) -> u64 {
810        self.rows_drained
811    }
812
813    fn data_type(&self) -> &DataType {
814        &self.data_type
815    }
816}
817
818struct IndirectlyLoaded {
819    offsets: Arc<[u64]>,
820    validity: BooleanBuffer,
821    root_decoder: Option<SimpleStructDecoder>,
822}
823
824impl std::fmt::Debug for IndirectlyLoaded {
825    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826        f.debug_struct("IndirectlyLoaded")
827            .field("offsets", &self.offsets)
828            .field("validity", &self.validity)
829            .finish()
830    }
831}
832
833/// An encoder for list offsets that "stitches" offsets and encodes nulls into the offsets
834///
835/// If we need to encode several list arrays into a single page then we need to "stitch" the offsets
836/// For example, imagine we have list arrays [[0, 1], [2]] and [[3, 4, 5]].
837///
838/// We will have offset arrays [0, 2, 3] and [0, 3].  We don't want to encode [0, 2, 3, 0, 3].  What
839/// we want is [0, 2, 3, 6]
840///
841/// This encoder also handles validity by converting a null value into an oversized offset.  For example,
842/// if we have four lists with offsets [0, 20, 20, 20, 30] and the list at index 2 is null (note that
843/// the list at index 1 is empty) then we turn this into offsets [0, 20, 20, 51, 30].  We replace a null
844/// offset with previous_offset + max_offset + 1.  This makes it possible to load a single item from the
845/// list array.
846///
847/// These offsets are always stored on disk as a u64 array.  First, this is because its simply much more
848/// likely than one expects that this is needed, even if our lists are not massive.  This is because we
849/// only write an offsets page when we have enough data.  This means we will probably accumulate a million
850/// offsets or more before we bother to write a page. If our lists have a few thousand items a piece then
851/// we end up passing the u32::MAX boundary.
852///
853/// The second reason is that list offsets are very easily compacted with delta + bit packing and so those
854/// u64 offsets should easily be shrunk down before being put on disk.
855///
856/// This encoder can encode both lists and large lists.  It can decode the resulting column into either type
857/// as well. (TODO: Test and enable large lists)
858///
859/// You can even write as a large list and decode as a regular list (as long as no single list has more than
860/// 2^31 items) or vice versa.  You could even encode a mixed stream of list and large list (but unclear that
861/// would ever be useful)
862#[derive(Debug)]
863struct ListOffsetsEncoder {
864    // An accumulation queue, we insert both offset arrays and validity arrays into this queue
865    accumulation_queue: AccumulationQueue,
866    // The inner encoder of offset values
867    inner_encoder: Arc<dyn ArrayEncoder>,
868    column_index: u32,
869}
870
871impl ListOffsetsEncoder {
872    fn new(
873        cache_bytes: u64,
874        keep_original_array: bool,
875        column_index: u32,
876        inner_encoder: Arc<dyn ArrayEncoder>,
877    ) -> Self {
878        Self {
879            accumulation_queue: AccumulationQueue::new(
880                cache_bytes,
881                column_index,
882                keep_original_array,
883            ),
884            inner_encoder,
885            column_index,
886        }
887    }
888
889    /// Given a list array, return the offsets as a standalone ArrayRef (either an Int32Array or Int64Array)
890    fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
891        match list_arr.data_type() {
892            DataType::List(_) => {
893                let offsets = list_arr.as_list::<i32>().offsets().clone();
894                Arc::new(Int32Array::new(offsets.into_inner(), None))
895            }
896            DataType::LargeList(_) => {
897                let offsets = list_arr.as_list::<i64>().offsets().clone();
898                Arc::new(Int64Array::new(offsets.into_inner(), None))
899            }
900            _ => panic!(),
901        }
902    }
903
904    /// Converts the validity of a list array into a boolean array.  If there is no validity information
905    /// then this is an empty boolean array.
906    fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
907        if let Some(validity) = list_arr.nulls() {
908            Arc::new(BooleanArray::new(validity.inner().clone(), None))
909        } else {
910            // We convert None validity into an empty array because the accumulation queue can't
911            // handle Option<ArrayRef>
912            new_empty_array(&DataType::Boolean)
913        }
914    }
915
916    fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
917        let inner_encoder = self.inner_encoder.clone();
918        let column_idx = self.column_index;
919        // At this point we should have 2*N arrays where the even-indexed arrays are integer offsets
920        // and the odd-indexed arrays are boolean validity bitmaps
921        let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
922        let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
923
924        tokio::task::spawn(async move {
925            let num_rows =
926                offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
927            let num_rows = num_rows as u64;
928            let mut buffer_index = 0;
929            let array = Self::do_encode(
930                offset_arrays,
931                validity_arrays,
932                &mut buffer_index,
933                num_rows,
934                inner_encoder,
935            )?;
936            let (data, description) = array.into_buffers();
937            Ok(EncodedPage {
938                data,
939                description: PageEncoding::Legacy(description),
940                num_rows,
941                column_idx,
942                row_number: 0, // Legacy encoders do not use
943            })
944        })
945        .map(|res_res| res_res.unwrap())
946        .boxed()
947    }
948
949    fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
950        let offsets = Self::extract_offsets(list_arr);
951        let validity = Self::extract_validity(list_arr);
952        let num_rows = offsets.len() as u64;
953        // Either inserting the offsets OR inserting the validity could cause the
954        // accumulation queue to fill up
955        if let Some(mut arrays) = self
956            .accumulation_queue
957            .insert(offsets, /*row_number=*/ 0, num_rows)
958        {
959            arrays.0.push(validity);
960            Some(self.make_encode_task(arrays.0))
961        } else if let Some(arrays) = self
962            .accumulation_queue
963            .insert(validity, /*row_number=*/ 0, num_rows)
964        {
965            Some(self.make_encode_task(arrays.0))
966        } else {
967            None
968        }
969    }
970
971    fn flush(&mut self) -> Option<EncodeTask> {
972        if let Some(arrays) = self.accumulation_queue.flush() {
973            Some(self.make_encode_task(arrays.0))
974        } else {
975            None
976        }
977    }
978
979    // Get's the total number of items covered by an array of offsets (keeping in
980    // mind that the first offset may not be zero)
981    fn get_offset_span(array: &dyn Array) -> u64 {
982        match array.data_type() {
983            DataType::Int32 => {
984                let arr_i32 = array.as_primitive::<Int32Type>();
985                (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
986            }
987            DataType::Int64 => {
988                let arr_i64 = array.as_primitive::<Int64Type>();
989                (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
990            }
991            _ => panic!(),
992        }
993    }
994
995    // This is where we do the work to actually shift the offsets and encode nulls
996    // Note that the output is u64 and the input could be i32 OR i64.
997    fn extend_offsets_vec_u64(
998        dest: &mut Vec<u64>,
999        offsets: &dyn Array,
1000        validity: Option<&BooleanArray>,
1001        // The offset of this list into the destination
1002        base: u64,
1003        null_offset_adjustment: u64,
1004    ) {
1005        match offsets.data_type() {
1006            DataType::Int32 => {
1007                let offsets_i32 = offsets.as_primitive::<Int32Type>();
1008                let start = offsets_i32.value(0) as u64;
1009                // If we want to take a list from start..X and change it into
1010                // a list from end..X then we need to add (base - start) to all elements
1011                // Note that `modifier` may be negative but (item + modifier) will always be >= 0
1012                let modifier = base as i64 - start as i64;
1013                if let Some(validity) = validity {
1014                    dest.extend(
1015                        offsets_i32
1016                            .values()
1017                            .iter()
1018                            .skip(1)
1019                            .zip(validity.values().iter())
1020                            .map(|(&off, valid)| {
1021                                (off as i64 + modifier) as u64
1022                                    + (!valid as u64 * null_offset_adjustment)
1023                            }),
1024                    );
1025                } else {
1026                    dest.extend(
1027                        offsets_i32
1028                            .values()
1029                            .iter()
1030                            .skip(1)
1031                            // Subtract by `start` so offsets start at 0
1032                            .map(|&v| (v as i64 + modifier) as u64),
1033                    );
1034                }
1035            }
1036            DataType::Int64 => {
1037                let offsets_i64 = offsets.as_primitive::<Int64Type>();
1038                let start = offsets_i64.value(0) as u64;
1039                // If we want to take a list from start..X and change it into
1040                // a list from end..X then we need to add (base - start) to all elements
1041                // Note that `modifier` may be negative but (item + modifier) will always be >= 0
1042                let modifier = base as i64 - start as i64;
1043                if let Some(validity) = validity {
1044                    dest.extend(
1045                        offsets_i64
1046                            .values()
1047                            .iter()
1048                            .skip(1)
1049                            .zip(validity.values().iter())
1050                            .map(|(&off, valid)| {
1051                                (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1052                            }),
1053                    )
1054                } else {
1055                    dest.extend(
1056                        offsets_i64
1057                            .values()
1058                            .iter()
1059                            .skip(1)
1060                            .map(|&v| (v + modifier) as u64),
1061                    );
1062                }
1063            }
1064            _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1065        }
1066    }
1067
1068    fn do_encode_u64(
1069        offset_arrays: Vec<ArrayRef>,
1070        validity: Vec<Option<&BooleanArray>>,
1071        num_offsets: u64,
1072        null_offset_adjustment: u64,
1073        buffer_index: &mut u32,
1074        inner_encoder: Arc<dyn ArrayEncoder>,
1075    ) -> Result<EncodedArray> {
1076        let mut offsets = Vec::with_capacity(num_offsets as usize);
1077        for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1078            let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1079            Self::extend_offsets_vec_u64(
1080                &mut offsets,
1081                &offsets_arr,
1082                validity_arr,
1083                last_prev_offset,
1084                null_offset_adjustment,
1085            );
1086        }
1087        let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1088            bits_per_value: 64,
1089            data: LanceBuffer::reinterpret_vec(offsets),
1090            num_values: num_offsets,
1091            block_info: BlockInfo::new(),
1092        });
1093        inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1094    }
1095
1096    fn do_encode(
1097        offset_arrays: Vec<ArrayRef>,
1098        validity_arrays: Vec<ArrayRef>,
1099        buffer_index: &mut u32,
1100        num_offsets: u64,
1101        inner_encoder: Arc<dyn ArrayEncoder>,
1102    ) -> Result<EncodedArray> {
1103        let validity_arrays = validity_arrays
1104            .iter()
1105            .map(|v| {
1106                if v.is_empty() {
1107                    None
1108                } else {
1109                    Some(v.as_boolean())
1110                }
1111            })
1112            .collect::<Vec<_>>();
1113        debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1114        let total_span = offset_arrays
1115            .iter()
1116            .map(|arr| Self::get_offset_span(arr.as_ref()))
1117            .sum::<u64>();
1118        // See encodings.proto for reasoning behind this value
1119        let null_offset_adjustment = total_span + 1;
1120        let encoded_offsets = Self::do_encode_u64(
1121            offset_arrays,
1122            validity_arrays,
1123            num_offsets,
1124            null_offset_adjustment,
1125            buffer_index,
1126            inner_encoder,
1127        )?;
1128        Ok(EncodedArray {
1129            data: encoded_offsets.data,
1130            encoding: pb::ArrayEncoding {
1131                array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1132                    pb::List {
1133                        offsets: Some(Box::new(encoded_offsets.encoding)),
1134                        null_offset_adjustment,
1135                        num_items: total_span,
1136                    },
1137                ))),
1138            },
1139        })
1140    }
1141}
1142
1143pub struct ListFieldEncoder {
1144    offsets_encoder: ListOffsetsEncoder,
1145    items_encoder: Box<dyn FieldEncoder>,
1146}
1147
1148impl ListFieldEncoder {
1149    pub fn new(
1150        items_encoder: Box<dyn FieldEncoder>,
1151        inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1152        cache_bytes_per_columns: u64,
1153        keep_original_array: bool,
1154        column_index: u32,
1155    ) -> Self {
1156        Self {
1157            offsets_encoder: ListOffsetsEncoder::new(
1158                cache_bytes_per_columns,
1159                keep_original_array,
1160                column_index,
1161                inner_offsets_encoder,
1162            ),
1163            items_encoder,
1164        }
1165    }
1166
1167    fn combine_tasks(
1168        offsets_tasks: Vec<EncodeTask>,
1169        item_tasks: Vec<EncodeTask>,
1170    ) -> Result<Vec<EncodeTask>> {
1171        let mut all_tasks = offsets_tasks;
1172        let item_tasks = item_tasks;
1173        all_tasks.extend(item_tasks);
1174        Ok(all_tasks)
1175    }
1176}
1177
1178impl FieldEncoder for ListFieldEncoder {
1179    fn maybe_encode(
1180        &mut self,
1181        array: ArrayRef,
1182        external_buffers: &mut OutOfLineBuffers,
1183        repdef: RepDefBuilder,
1184        row_number: u64,
1185        num_rows: u64,
1186    ) -> Result<Vec<EncodeTask>> {
1187        // The list may have an offset / shorter length which means the underlying
1188        // values array could be longer than what we need to encode and so we need
1189        // to slice down to the region of interest.
1190        let items = match array.data_type() {
1191            DataType::List(_) => {
1192                let list_arr = array.as_list::<i32>();
1193                let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1194                let items_end =
1195                    list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1196                list_arr
1197                    .values()
1198                    .slice(items_start, items_end - items_start)
1199            }
1200            DataType::LargeList(_) => {
1201                let list_arr = array.as_list::<i64>();
1202                let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1203                let items_end =
1204                    list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1205                list_arr
1206                    .values()
1207                    .slice(items_start, items_end - items_start)
1208            }
1209            _ => panic!(),
1210        };
1211        let offsets_tasks = self
1212            .offsets_encoder
1213            .maybe_encode_offsets_and_validity(array.as_ref())
1214            .map(|task| vec![task])
1215            .unwrap_or_default();
1216        let mut item_tasks = self.items_encoder.maybe_encode(
1217            items,
1218            external_buffers,
1219            repdef,
1220            row_number,
1221            num_rows,
1222        )?;
1223        if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1224            // An items page cannot currently be shared by two different offsets pages.  This is
1225            // a limitation in the current scheduler and could be addressed in the future.  As a result
1226            // we always need to encode the items page if we encode the offsets page.
1227            //
1228            // In practice this isn't usually too bad unless we are targeting very small pages.
1229            item_tasks = self.items_encoder.flush(external_buffers)?;
1230        }
1231        Self::combine_tasks(offsets_tasks, item_tasks)
1232    }
1233
1234    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1235        let offsets_tasks = self
1236            .offsets_encoder
1237            .flush()
1238            .map(|task| vec![task])
1239            .unwrap_or_default();
1240        let item_tasks = self.items_encoder.flush(external_buffers)?;
1241        Self::combine_tasks(offsets_tasks, item_tasks)
1242    }
1243
1244    fn num_columns(&self) -> u32 {
1245        self.items_encoder.num_columns() + 1
1246    }
1247
1248    fn finish(
1249        &mut self,
1250        external_buffers: &mut OutOfLineBuffers,
1251    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1252        let inner_columns = self.items_encoder.finish(external_buffers);
1253        async move {
1254            let mut columns = vec![EncodedColumn::default()];
1255            let inner_columns = inner_columns.await?;
1256            columns.extend(inner_columns);
1257            Ok(columns)
1258        }
1259        .boxed()
1260    }
1261}
1262
1263/// A structural encoder for list fields
1264///
1265/// The list's offsets are added to the rep/def builder
1266/// and the list array's values are passed to the child encoder
1267///
1268/// The values will have any garbage values removed and will be trimmed
1269/// to only include the values that are actually used.
1270pub struct ListStructuralEncoder {
1271    child: Box<dyn FieldEncoder>,
1272}
1273
1274impl ListStructuralEncoder {
1275    pub fn new(child: Box<dyn FieldEncoder>) -> Self {
1276        Self { child }
1277    }
1278}
1279
1280impl FieldEncoder for ListStructuralEncoder {
1281    fn maybe_encode(
1282        &mut self,
1283        array: ArrayRef,
1284        external_buffers: &mut OutOfLineBuffers,
1285        mut repdef: RepDefBuilder,
1286        row_number: u64,
1287        num_rows: u64,
1288    ) -> Result<Vec<EncodeTask>> {
1289        let values = if let Some(list_arr) = array.as_list_opt::<i32>() {
1290            let has_garbage_values =
1291                repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned());
1292            if has_garbage_values {
1293                list_arr.filter_garbage_nulls().trimmed_values()
1294            } else {
1295                list_arr.trimmed_values()
1296            }
1297        } else if let Some(list_arr) = array.as_list_opt::<i64>() {
1298            let has_garbage_values =
1299                repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned());
1300            if has_garbage_values {
1301                list_arr.filter_garbage_nulls().trimmed_values()
1302            } else {
1303                list_arr.trimmed_values()
1304            }
1305        } else {
1306            panic!("List encoder used for non-list data")
1307        };
1308        self.child
1309            .maybe_encode(values, external_buffers, repdef, row_number, num_rows)
1310    }
1311
1312    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1313        self.child.flush(external_buffers)
1314    }
1315
1316    fn num_columns(&self) -> u32 {
1317        self.child.num_columns()
1318    }
1319
1320    fn finish(
1321        &mut self,
1322        external_buffers: &mut OutOfLineBuffers,
1323    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1324        self.child.finish(external_buffers)
1325    }
1326}
1327
1328#[derive(Debug)]
1329pub struct StructuralListScheduler {
1330    child: Box<dyn StructuralFieldScheduler>,
1331}
1332
1333impl StructuralListScheduler {
1334    pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
1335        Self { child }
1336    }
1337}
1338
1339impl StructuralFieldScheduler for StructuralListScheduler {
1340    fn schedule_ranges<'a>(
1341        &'a self,
1342        ranges: &[Range<u64>],
1343        filter: &FilterExpression,
1344    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
1345        let child = self.child.schedule_ranges(ranges, filter)?;
1346
1347        Ok(Box::new(StructuralListSchedulingJob::new(child)))
1348    }
1349
1350    fn initialize<'a>(
1351        &'a mut self,
1352        filter: &'a FilterExpression,
1353        context: &'a SchedulerContext,
1354    ) -> BoxFuture<'a, Result<()>> {
1355        self.child.initialize(filter, context)
1356    }
1357}
1358
1359/// Scheduling job for list data
1360///
1361/// Scheduling is handled by the primitive encoder and nothing special
1362/// happens here.
1363#[derive(Debug)]
1364struct StructuralListSchedulingJob<'a> {
1365    child: Box<dyn StructuralSchedulingJob + 'a>,
1366}
1367
1368impl<'a> StructuralListSchedulingJob<'a> {
1369    fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
1370        Self { child }
1371    }
1372}
1373
1374impl StructuralSchedulingJob for StructuralListSchedulingJob<'_> {
1375    fn schedule_next(
1376        &mut self,
1377        context: &mut SchedulerContext,
1378    ) -> Result<Option<ScheduledScanLine>> {
1379        self.child.schedule_next(context)
1380    }
1381}
1382
1383#[derive(Debug)]
1384pub struct StructuralListDecoder {
1385    child: Box<dyn StructuralFieldDecoder>,
1386    data_type: DataType,
1387}
1388
1389impl StructuralListDecoder {
1390    pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
1391        Self { child, data_type }
1392    }
1393}
1394
1395impl StructuralFieldDecoder for StructuralListDecoder {
1396    fn accept_page(&mut self, child: crate::decoder::LoadedPage) -> Result<()> {
1397        self.child.accept_page(child)
1398    }
1399
1400    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
1401        let child_task = self.child.drain(num_rows)?;
1402        Ok(Box::new(StructuralListDecodeTask::new(
1403            child_task,
1404            self.data_type.clone(),
1405        )))
1406    }
1407
1408    fn data_type(&self) -> &DataType {
1409        &self.data_type
1410    }
1411}
1412
1413#[derive(Debug)]
1414struct StructuralListDecodeTask {
1415    child_task: Box<dyn StructuralDecodeArrayTask>,
1416    data_type: DataType,
1417}
1418
1419impl StructuralListDecodeTask {
1420    fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
1421        Self {
1422            child_task,
1423            data_type,
1424        }
1425    }
1426}
1427
1428impl StructuralDecodeArrayTask for StructuralListDecodeTask {
1429    fn decode(self: Box<Self>) -> Result<DecodedArray> {
1430        let DecodedArray { array, mut repdef } = self.child_task.decode()?;
1431        match &self.data_type {
1432            DataType::List(child_field) => {
1433                let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
1434                let list_array = ListArray::try_new(child_field.clone(), offsets, array, validity)?;
1435                Ok(DecodedArray {
1436                    array: Arc::new(list_array),
1437                    repdef,
1438                })
1439            }
1440            DataType::LargeList(child_field) => {
1441                let (offsets, validity) = repdef.unravel_offsets::<i64>()?;
1442                let list_array =
1443                    LargeListArray::try_new(child_field.clone(), offsets, array, validity)?;
1444                Ok(DecodedArray {
1445                    array: Arc::new(list_array),
1446                    repdef,
1447                })
1448            }
1449            _ => panic!("List decoder did not have a list field"),
1450        }
1451    }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456
1457    use std::{collections::HashMap, sync::Arc};
1458
1459    use arrow::array::{Int64Builder, LargeListBuilder, StringBuilder};
1460    use arrow_array::{
1461        builder::{Int32Builder, ListBuilder},
1462        Array, ArrayRef, BooleanArray, DictionaryArray, LargeStringArray, ListArray, StructArray,
1463        UInt64Array, UInt8Array,
1464    };
1465    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
1466    use arrow_schema::{DataType, Field, Fields};
1467    use lance_core::datatypes::{
1468        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
1469    };
1470    use rstest::rstest;
1471
1472    use crate::{
1473        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1474        version::LanceFileVersion,
1475    };
1476
1477    fn make_list_type(inner_type: DataType) -> DataType {
1478        DataType::List(Arc::new(Field::new("item", inner_type, true)))
1479    }
1480
1481    fn make_large_list_type(inner_type: DataType) -> DataType {
1482        DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
1483    }
1484
1485    #[rstest]
1486    #[test_log::test(tokio::test)]
1487    async fn test_list(
1488        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1489        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1490        structural_encoding: &str,
1491    ) {
1492        let mut field_metadata = HashMap::new();
1493        field_metadata.insert(
1494            STRUCTURAL_ENCODING_META_KEY.to_string(),
1495            structural_encoding.into(),
1496        );
1497        let field =
1498            Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
1499        check_round_trip_encoding_random(field, version).await;
1500    }
1501
1502    #[test_log::test(tokio::test)]
1503    async fn test_large_list() {
1504        let field = Field::new("", make_large_list_type(DataType::Int32), true);
1505        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1506    }
1507
1508    #[test_log::test(tokio::test)]
1509    async fn test_nested_strings() {
1510        let field = Field::new("", make_list_type(DataType::Utf8), true);
1511        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1512    }
1513
1514    #[test_log::test(tokio::test)]
1515    async fn test_nested_list() {
1516        let field = Field::new("", make_list_type(make_list_type(DataType::Int32)), true);
1517        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1518    }
1519
1520    #[test_log::test(tokio::test)]
1521    async fn test_list_struct_list() {
1522        let struct_type = DataType::Struct(Fields::from(vec![Field::new(
1523            "inner_str",
1524            DataType::Utf8,
1525            false,
1526        )]));
1527
1528        let field = Field::new("", make_list_type(struct_type), true);
1529        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1530    }
1531
1532    #[test_log::test(tokio::test)]
1533    async fn test_list_struct_empty() {
1534        let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
1535        let items = UInt64Array::from(Vec::<u64>::new());
1536        let structs = StructArray::new(fields, vec![Arc::new(items)], None);
1537        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
1538        let lists = ListArray::new(
1539            Arc::new(Field::new("item", structs.data_type().clone(), true)),
1540            offsets,
1541            Arc::new(structs),
1542            None,
1543        );
1544
1545        check_round_trip_encoding_of_data(
1546            vec![Arc::new(lists)],
1547            &TestCases::default(),
1548            HashMap::new(),
1549        )
1550        .await;
1551    }
1552
1553    #[rstest]
1554    #[test_log::test(tokio::test)]
1555    async fn test_simple_list(
1556        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1557        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1558        structural_encoding: &str,
1559    ) {
1560        let items_builder = Int32Builder::new();
1561        let mut list_builder = ListBuilder::new(items_builder);
1562        list_builder.append_value([Some(1), Some(2), Some(3)]);
1563        list_builder.append_value([Some(4), Some(5)]);
1564        list_builder.append_null();
1565        list_builder.append_value([Some(6), Some(7), Some(8)]);
1566        let list_array = list_builder.finish();
1567
1568        let mut field_metadata = HashMap::new();
1569        field_metadata.insert(
1570            STRUCTURAL_ENCODING_META_KEY.to_string(),
1571            structural_encoding.into(),
1572        );
1573
1574        let test_cases = TestCases::default()
1575            .with_range(0..2)
1576            .with_range(0..3)
1577            .with_range(1..3)
1578            .with_indices(vec![1, 3])
1579            .with_indices(vec![2])
1580            .with_file_version(version);
1581        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1582            .await;
1583    }
1584
1585    #[rstest]
1586    #[test_log::test(tokio::test)]
1587    async fn test_simple_string_list(
1588        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1589        structural_encoding: &str,
1590    ) {
1591        let items_builder = StringBuilder::new();
1592        let mut list_builder = ListBuilder::new(items_builder);
1593        list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
1594        list_builder.append_value([Some("gh"), None]);
1595        list_builder.append_null();
1596        list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
1597        let list_array = list_builder.finish();
1598
1599        let mut field_metadata = HashMap::new();
1600        field_metadata.insert(
1601            STRUCTURAL_ENCODING_META_KEY.to_string(),
1602            structural_encoding.into(),
1603        );
1604
1605        let test_cases = TestCases::default()
1606            .with_range(0..2)
1607            .with_range(0..3)
1608            .with_range(1..3)
1609            .with_indices(vec![1, 3])
1610            .with_indices(vec![2])
1611            .with_file_version(LanceFileVersion::V2_1);
1612        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1613            .await;
1614    }
1615
1616    #[rstest]
1617    #[test_log::test(tokio::test)]
1618    async fn test_simple_sliced_list(
1619        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1620        structural_encoding: &str,
1621    ) {
1622        let items_builder = Int32Builder::new();
1623        let mut list_builder = ListBuilder::new(items_builder);
1624        list_builder.append_value([Some(1), Some(2), Some(3)]);
1625        list_builder.append_value([Some(4), Some(5)]);
1626        list_builder.append_null();
1627        list_builder.append_value([Some(6), Some(7), Some(8)]);
1628        let list_array = list_builder.finish();
1629
1630        let list_array = list_array.slice(1, 2);
1631
1632        let mut field_metadata = HashMap::new();
1633        field_metadata.insert(
1634            STRUCTURAL_ENCODING_META_KEY.to_string(),
1635            structural_encoding.into(),
1636        );
1637
1638        let test_cases = TestCases::default()
1639            .with_range(0..2)
1640            .with_range(1..2)
1641            .with_indices(vec![0])
1642            .with_indices(vec![1])
1643            .with_file_version(LanceFileVersion::V2_1);
1644        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1645            .await;
1646    }
1647
1648    #[test_log::test(tokio::test)]
1649    async fn test_simple_list_dict() {
1650        let values = LargeStringArray::from_iter_values(["a", "bb", "ccc"]);
1651        let indices = UInt8Array::from(vec![0, 1, 2, 0, 1, 2, 0, 1, 2]);
1652        let dict_array = DictionaryArray::new(indices, Arc::new(values));
1653        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 5, 6, 9]));
1654        let list_array = ListArray::new(
1655            Arc::new(Field::new("item", dict_array.data_type().clone(), true)),
1656            offsets,
1657            Arc::new(dict_array),
1658            None,
1659        );
1660
1661        let test_cases = TestCases::default()
1662            .with_range(0..2)
1663            .with_range(1..3)
1664            .with_range(2..4)
1665            .with_indices(vec![1])
1666            .with_indices(vec![2]);
1667        check_round_trip_encoding_of_data(
1668            vec![Arc::new(list_array)],
1669            &test_cases,
1670            HashMap::default(),
1671        )
1672        .await;
1673    }
1674
1675    #[rstest]
1676    #[test_log::test(tokio::test)]
1677    async fn test_list_with_garbage_nulls(
1678        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1679        structural_encoding: &str,
1680    ) {
1681        // In Arrow, list nulls are allowed to be non-empty, with masked garbage values
1682        // Here we make a list with a null row in the middle with 3 garbage values
1683        let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1684        let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
1685        let offsets = OffsetBuffer::new(offsets);
1686        let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
1687        let list_arr = ListArray::new(
1688            Arc::new(Field::new("item", DataType::UInt64, true)),
1689            offsets,
1690            Arc::new(items),
1691            Some(list_validity),
1692        );
1693
1694        let mut field_metadata = HashMap::new();
1695        field_metadata.insert(
1696            STRUCTURAL_ENCODING_META_KEY.to_string(),
1697            structural_encoding.into(),
1698        );
1699
1700        let test_cases = TestCases::default()
1701            .with_range(0..3)
1702            .with_range(1..2)
1703            .with_indices(vec![1])
1704            .with_indices(vec![2])
1705            .with_file_version(LanceFileVersion::V2_1);
1706        check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, field_metadata)
1707            .await;
1708    }
1709
1710    #[rstest]
1711    #[test_log::test(tokio::test)]
1712    async fn test_simple_two_page_list(
1713        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1714        structural_encoding: &str,
1715    ) {
1716        // This is a simple pre-defined list that spans two pages.  This test is useful for
1717        // debugging the repetition index
1718        let items_builder = Int64Builder::new();
1719        let mut list_builder = ListBuilder::new(items_builder);
1720        for i in 0..512 {
1721            list_builder.append_value([Some(i), Some(i * 2)]);
1722        }
1723        let list_array_1 = list_builder.finish();
1724
1725        let items_builder = Int64Builder::new();
1726        let mut list_builder = ListBuilder::new(items_builder);
1727        for i in 0..512 {
1728            let i = i + 512;
1729            list_builder.append_value([Some(i), Some(i * 2)]);
1730        }
1731        let list_array_2 = list_builder.finish();
1732
1733        let mut metadata = HashMap::new();
1734        metadata.insert(
1735            STRUCTURAL_ENCODING_META_KEY.to_string(),
1736            structural_encoding.into(),
1737        );
1738
1739        let test_cases = TestCases::default()
1740            .with_file_version(LanceFileVersion::V2_1)
1741            .with_page_sizes(vec![100])
1742            .with_range(800..900);
1743        check_round_trip_encoding_of_data(
1744            vec![Arc::new(list_array_1), Arc::new(list_array_2)],
1745            &test_cases,
1746            metadata,
1747        )
1748        .await;
1749    }
1750
1751    #[test_log::test(tokio::test)]
1752    async fn test_simple_large_list() {
1753        let items_builder = Int32Builder::new();
1754        let mut list_builder = LargeListBuilder::new(items_builder);
1755        list_builder.append_value([Some(1), Some(2), Some(3)]);
1756        list_builder.append_value([Some(4), Some(5)]);
1757        list_builder.append_null();
1758        list_builder.append_value([Some(6), Some(7), Some(8)]);
1759        let list_array = list_builder.finish();
1760
1761        let test_cases = TestCases::default()
1762            .with_range(0..2)
1763            .with_range(0..3)
1764            .with_range(1..3)
1765            .with_indices(vec![1, 3]);
1766        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1767            .await;
1768    }
1769
1770    #[test_log::test(tokio::test)]
1771    async fn test_empty_lists() {
1772        // Scenario 1: Some lists are empty
1773
1774        let values = [vec![Some(1), Some(2), Some(3)], vec![], vec![None]];
1775        // Test empty list at beginning, middle, and end
1776        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
1777            let items_builder = Int32Builder::new();
1778            let mut list_builder = ListBuilder::new(items_builder);
1779            for idx in order {
1780                list_builder.append_value(values[idx].clone());
1781            }
1782            let list_array = Arc::new(list_builder.finish());
1783            let test_cases = TestCases::default()
1784                .with_indices(vec![1])
1785                .with_indices(vec![0])
1786                .with_indices(vec![2])
1787                .with_indices(vec![0, 1]);
1788            check_round_trip_encoding_of_data(
1789                vec![list_array.clone()],
1790                &test_cases,
1791                HashMap::new(),
1792            )
1793            .await;
1794            let test_cases = test_cases.with_batch_size(1);
1795            check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1796        }
1797
1798        // Scenario 2: All lists are empty
1799
1800        // When encoding a list of empty lists there are no items to encode
1801        // which is strange and we want to ensure we handle it
1802        let items_builder = Int32Builder::new();
1803        let mut list_builder = ListBuilder::new(items_builder);
1804        list_builder.append(true);
1805        list_builder.append_null();
1806        list_builder.append(true);
1807        let list_array = Arc::new(list_builder.finish());
1808
1809        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1810        check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
1811            .await;
1812        let test_cases = test_cases.with_batch_size(1);
1813        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1814
1815        // Scenario 2: All lists are empty (but now with strings)
1816
1817        // When encoding a list of empty lists there are no items to encode
1818        // which is strange and we want to ensure we handle it
1819        let items_builder = StringBuilder::new();
1820        let mut list_builder = ListBuilder::new(items_builder);
1821        list_builder.append(true);
1822        list_builder.append_null();
1823        list_builder.append(true);
1824        let list_array = Arc::new(list_builder.finish());
1825
1826        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1827        check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
1828            .await;
1829        let test_cases = test_cases.with_batch_size(1);
1830        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1831    }
1832
1833    #[test_log::test(tokio::test)]
1834    #[ignore] // This test is quite slow in debug mode
1835    async fn test_jumbo_list() {
1836        // This is an overflow test.  We have a list of lists where each list
1837        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
1838        // offsets range
1839        let items = BooleanArray::new_null(1024 * 1024);
1840        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1024 * 1024]));
1841        let list_arr = Arc::new(ListArray::new(
1842            Arc::new(Field::new("item", DataType::Boolean, true)),
1843            offsets,
1844            Arc::new(items),
1845            None,
1846        )) as ArrayRef;
1847        let arrs = vec![list_arr; 5000];
1848
1849        // We can't validate because our validation relies on concatenating all input arrays
1850        let test_cases = TestCases::default().without_validation();
1851        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
1852    }
1853}