lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    fmt::Debug,
8    iter,
9    ops::Range,
10    sync::Arc,
11    vec,
12};
13
14use arrow::array::AsArray;
15use arrow_array::{
16    make_array, types::UInt64Type, Array, ArrayRef, FixedSizeListArray, PrimitiveArray,
17};
18use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, ScalarBuffer};
19use arrow_schema::{DataType, Field as ArrowField};
20use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryStreamExt};
21use itertools::Itertools;
22use lance_arrow::deepcopy::deep_copy_array;
23use lance_core::{
24    cache::{Context, DeepSizeOf},
25    datatypes::{
26        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
27    },
28    error::Error,
29    utils::bit::pad_bytes,
30    utils::hash::U8SliceKey,
31};
32use log::{debug, trace};
33use snafu::location;
34
35use crate::repdef::{
36    build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser,
37    DefinitionInterpretation, RepDefSlicer,
38};
39use crate::statistics::{ComputeStat, GetStat, Stat};
40use crate::utils::bytepack::ByteUnpacker;
41use crate::{
42    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
43    utils::bytepack::BytepackedIntegerEncoder,
44};
45use crate::{
46    decoder::{FixedPerValueDecompressor, VariablePerValueDecompressor},
47    encoder::PerValueDataBlock,
48};
49use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
50
51use crate::{
52    buffer::LanceBuffer,
53    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
54    decoder::{
55        BlockDecompressor, ColumnInfo, DecodeArrayTask, DecodePageTask, DecodedArray, DecodedPage,
56        DecompressorStrategy, FieldScheduler, FilterExpression, LoadedPage, LogicalPageDecoder,
57        MessageType, MiniBlockDecompressor, NextDecodeTask, PageEncoding, PageInfo, PageScheduler,
58        PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
59        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
60        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
61    },
62    encoder::{
63        ArrayEncodingStrategy, CompressionStrategy, EncodeTask, EncodedColumn, EncodedPage,
64        EncodingOptions, FieldEncoder, MiniBlockChunk, MiniBlockCompressed, OutOfLineBuffers,
65    },
66    encodings::physical::{decoder_from_array_encoding, ColumnBuffers, PageBuffers},
67    format::{pb, ProtobufUtils},
68    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
69    EncodingsIo,
70};
71
72#[derive(Debug)]
73struct PrimitivePage {
74    scheduler: Box<dyn PageScheduler>,
75    num_rows: u64,
76    page_index: u32,
77}
78
79/// A field scheduler for primitive fields
80///
81/// This maps to exactly one column and it assumes that the top-level
82/// encoding of each page is "basic".  The basic encoding decodes into an
83/// optional buffer of validity and a fixed-width buffer of values
84/// which is exactly what we need to create a primitive array.
85///
86/// Note: we consider booleans and fixed-size-lists of primitive types to be
87/// primitive types.  This is slightly different than arrow-rs's definition
88#[derive(Debug)]
89pub struct PrimitiveFieldScheduler {
90    data_type: DataType,
91    page_schedulers: Vec<PrimitivePage>,
92    num_rows: u64,
93    should_validate: bool,
94    column_index: u32,
95}
96
97impl PrimitiveFieldScheduler {
98    pub fn new(
99        column_index: u32,
100        data_type: DataType,
101        pages: Arc<[PageInfo]>,
102        buffers: ColumnBuffers,
103        should_validate: bool,
104    ) -> Self {
105        let page_schedulers = pages
106            .iter()
107            .enumerate()
108            // Buggy versions of Lance could sometimes create empty pages
109            .filter(|(page_index, page)| {
110                log::trace!("Skipping empty page with index {}", page_index);
111                page.num_rows > 0
112            })
113            .map(|(page_index, page)| {
114                let page_buffers = PageBuffers {
115                    column_buffers: buffers,
116                    positions_and_sizes: &page.buffer_offsets_and_sizes,
117                };
118                let scheduler = decoder_from_array_encoding(
119                    page.encoding.as_legacy(),
120                    &page_buffers,
121                    &data_type,
122                );
123                PrimitivePage {
124                    scheduler,
125                    num_rows: page.num_rows,
126                    page_index: page_index as u32,
127                }
128            })
129            .collect::<Vec<_>>();
130        let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
131        Self {
132            data_type,
133            page_schedulers,
134            num_rows,
135            should_validate,
136            column_index,
137        }
138    }
139}
140
141#[derive(Debug)]
142struct PrimitiveFieldSchedulingJob<'a> {
143    scheduler: &'a PrimitiveFieldScheduler,
144    ranges: Vec<Range<u64>>,
145    page_idx: usize,
146    range_idx: usize,
147    range_offset: u64,
148    global_row_offset: u64,
149}
150
151impl<'a> PrimitiveFieldSchedulingJob<'a> {
152    pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
153        Self {
154            scheduler,
155            ranges,
156            page_idx: 0,
157            range_idx: 0,
158            range_offset: 0,
159            global_row_offset: 0,
160        }
161    }
162}
163
164impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
165    fn schedule_next(
166        &mut self,
167        context: &mut SchedulerContext,
168        priority: &dyn PriorityRange,
169    ) -> Result<ScheduledScanLine> {
170        debug_assert!(self.range_idx < self.ranges.len());
171        // Get our current range
172        let mut range = self.ranges[self.range_idx].clone();
173        range.start += self.range_offset;
174
175        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
176        trace!(
177            "Current range is {:?} and current page has {} rows",
178            range,
179            cur_page.num_rows
180        );
181        // Skip entire pages until we have some overlap with our next range
182        while cur_page.num_rows + self.global_row_offset <= range.start {
183            self.global_row_offset += cur_page.num_rows;
184            self.page_idx += 1;
185            trace!("Skipping entire page of {} rows", cur_page.num_rows);
186            cur_page = &self.scheduler.page_schedulers[self.page_idx];
187        }
188
189        // Now the cur_page has overlap with range.  Continue looping through ranges
190        // until we find a range that exceeds the current page
191
192        let mut ranges_in_page = Vec::new();
193        while cur_page.num_rows + self.global_row_offset > range.start {
194            range.start = range.start.max(self.global_row_offset);
195            let start_in_page = range.start - self.global_row_offset;
196            let end_in_page = start_in_page + (range.end - range.start);
197            let end_in_page = end_in_page.min(cur_page.num_rows);
198            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
199
200            ranges_in_page.push(start_in_page..end_in_page);
201            if last_in_range {
202                self.range_idx += 1;
203                if self.range_idx == self.ranges.len() {
204                    break;
205                }
206                range = self.ranges[self.range_idx].clone();
207            } else {
208                break;
209            }
210        }
211
212        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
213        trace!(
214            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
215            num_rows_in_next,
216            ranges_in_page.len(),
217            cur_page.num_rows,
218            priority.current_priority(),
219            self.scheduler.column_index,
220            cur_page.page_index,
221        );
222
223        self.global_row_offset += cur_page.num_rows;
224        self.page_idx += 1;
225
226        let physical_decoder = cur_page.scheduler.schedule_ranges(
227            &ranges_in_page,
228            context.io(),
229            priority.current_priority(),
230        );
231
232        let logical_decoder = PrimitiveFieldDecoder {
233            data_type: self.scheduler.data_type.clone(),
234            column_index: self.scheduler.column_index,
235            unloaded_physical_decoder: Some(physical_decoder),
236            physical_decoder: None,
237            rows_drained: 0,
238            num_rows: num_rows_in_next,
239            should_validate: self.scheduler.should_validate,
240            page_index: cur_page.page_index,
241        };
242
243        let decoder = Box::new(logical_decoder);
244        let decoder_ready = context.locate_decoder(decoder);
245        Ok(ScheduledScanLine {
246            decoders: vec![MessageType::DecoderReady(decoder_ready)],
247            rows_scheduled: num_rows_in_next,
248        })
249    }
250
251    fn num_rows(&self) -> u64 {
252        self.ranges.iter().map(|r| r.end - r.start).sum()
253    }
254}
255
256impl FieldScheduler for PrimitiveFieldScheduler {
257    fn num_rows(&self) -> u64 {
258        self.num_rows
259    }
260
261    fn schedule_ranges<'a>(
262        &'a self,
263        ranges: &[std::ops::Range<u64>],
264        // TODO: Could potentially use filter to simplify decode, something of a micro-optimization probably
265        _filter: &FilterExpression,
266    ) -> Result<Box<dyn SchedulingJob + 'a>> {
267        Ok(Box::new(PrimitiveFieldSchedulingJob::new(
268            self,
269            ranges.to_vec(),
270        )))
271    }
272
273    fn initialize<'a>(
274        &'a self,
275        _filter: &'a FilterExpression,
276        _context: &'a SchedulerContext,
277    ) -> BoxFuture<'a, Result<()>> {
278        // 2.0 schedulers do not need to initialize
279        std::future::ready(Ok(())).boxed()
280    }
281}
282
283/// A trait for figuring out how to schedule the data within
284/// a single page.
285trait StructuralPageScheduler: std::fmt::Debug + Send {
286    /// Fetches any metadata required for the page
287    fn initialize<'a>(
288        &'a mut self,
289        io: &Arc<dyn EncodingsIo>,
290    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
291    /// Loads metadata from a previous initialize call
292    fn load(&mut self, data: &Arc<dyn CachedPageData>);
293    /// Schedules the read of the given ranges in the page
294    fn schedule_ranges(
295        &self,
296        ranges: &[Range<u64>],
297        io: &Arc<dyn EncodingsIo>,
298    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
299}
300
301/// Metadata describing the decoded size of a mini-block
302#[derive(Debug)]
303struct ChunkMeta {
304    num_values: u64,
305    chunk_size_bytes: u64,
306    offset_bytes: u64,
307}
308
309/// A mini-block chunk that has been decoded and decompressed
310#[derive(Debug)]
311struct DecodedMiniBlockChunk {
312    rep: Option<ScalarBuffer<u16>>,
313    def: Option<ScalarBuffer<u16>>,
314    values: DataBlock,
315}
316
317/// A task to decode a one or more mini-blocks of data into an output batch
318///
319/// Note: Two batches might share the same mini-block of data.  When this happens
320/// then each batch gets a copy of the block and each batch decodes the block independently.
321///
322/// This means we have duplicated work but it is necessary to avoid having to synchronize
323/// the decoding of the block. (TODO: test this theory)
324#[derive(Debug)]
325struct DecodeMiniBlockTask {
326    // The decompressors for the rep, def, and value buffers
327    rep_decompressor: Arc<dyn BlockDecompressor>,
328    def_decompressor: Arc<dyn BlockDecompressor>,
329    value_decompressor: Arc<dyn MiniBlockDecompressor>,
330    dictionary_data: Option<Arc<DataBlock>>,
331    def_meaning: Arc<[DefinitionInterpretation]>,
332    max_visible_level: u16,
333    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
334}
335
336impl DecodeMiniBlockTask {
337    fn decode_levels(
338        rep_decompressor: &dyn BlockDecompressor,
339        levels: LanceBuffer,
340    ) -> Result<Option<ScalarBuffer<u16>>> {
341        let rep = rep_decompressor.decompress(levels)?;
342        match rep {
343            DataBlock::FixedWidth(mut rep) => Ok(Some(rep.data.borrow_to_typed_slice::<u16>())),
344            DataBlock::Constant(constant) => {
345                assert_eq!(constant.data.len(), 2);
346                if constant.data[0] == 0 && constant.data[1] == 0 {
347                    Ok(None)
348                } else {
349                    // Maybe in the future we will encode all-null def or
350                    // constant rep (all 1-item lists?) in a constant encoding
351                    // but that doesn't happen today so we don't need to worry.
352                    todo!()
353                }
354            }
355            _ => unreachable!(),
356        }
357    }
358
359    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
360    // values from `level_buf` starting at `offset`.
361    //
362    // We need to handle both the case where `levels` is None (no nulls encountered
363    // yet) and the case where `level_buf` is None (the input we are copying from has
364    // no nulls)
365    fn extend_levels(
366        range: Range<u64>,
367        levels: &mut Option<LevelBuffer>,
368        level_buf: &Option<impl AsRef<[u16]>>,
369        dest_offset: usize,
370    ) {
371        if let Some(level_buf) = level_buf {
372            if levels.is_none() {
373                // This is the first non-empty def buf we've hit, fill in the past
374                // with 0 (valid)
375                let mut new_levels_vec =
376                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
377                new_levels_vec.extend(iter::repeat(0).take(dest_offset));
378                *levels = Some(new_levels_vec);
379            }
380            levels.as_mut().unwrap().extend(
381                level_buf.as_ref()[range.start as usize..range.end as usize]
382                    .iter()
383                    .copied(),
384            );
385        } else if let Some(levels) = levels {
386            let num_values = (range.end - range.start) as usize;
387            // This is an all-valid level_buf but we had nulls earlier and so we
388            // need to materialize it
389            levels.extend(iter::repeat(0).take(num_values));
390        }
391    }
392
393    /// Maps a range of rows to a range of items and a range of levels
394    ///
395    /// If there is no repetition information this just returns the range as-is.
396    ///
397    /// If there is repetition information then we need to do some work to figure out what
398    /// range of items corresponds to the requested range of rows.
399    ///
400    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
401    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
402    ///
403    /// Idx: 0 1 2 3 4 5 6
404    /// Rep: 1 0 0 1 0 1 0
405    ///
406    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
407    ///
408    /// If there are invisible items then we don't count them when calcuating the range of items we
409    /// are interested in but we do count them when calculating the range of levels we are interested
410    /// in.  As a result we have to return both the item range (first return value) and the level range
411    /// (second return value).
412    ///
413    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
414    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
415    ///
416    /// Idx: 0 1 2 3 4 5 6 7 8
417    /// Rep: 1 0 0 1 0 1 1 0 0
418    /// Def: 0 0 0 0 0 1 0 0 0
419    /// Itm: 1 2 3 4 5 6 7 8
420    ///
421    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
422    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
423    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
424    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
425    ///
426    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
427    ///
428    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
429    fn map_range(
430        range: Range<u64>,
431        rep: Option<&impl AsRef<[u16]>>,
432        def: Option<&impl AsRef<[u16]>>,
433        max_rep: u16,
434        max_visible_def: u16,
435        // The total number of items (not rows) in the chunk.  This is not quite the same as
436        // rep.len() / def.len() because it doesn't count invisible items
437        total_items: u64,
438        preamble_action: PreambleAction,
439    ) -> (Range<u64>, Range<u64>) {
440        if let Some(rep) = rep {
441            let mut rep = rep.as_ref();
442            // If there is a preamble and we need to skip it then do that first.  The work is the same
443            // whether there is def information or not
444            let mut items_in_preamble = 0;
445            let first_row_start = match preamble_action {
446                PreambleAction::Skip | PreambleAction::Take => {
447                    let first_row_start = if let Some(def) = def.as_ref() {
448                        let mut first_row_start = None;
449                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
450                            if *rep == max_rep {
451                                first_row_start = Some(idx);
452                                break;
453                            }
454                            if *def <= max_visible_def {
455                                items_in_preamble += 1;
456                            }
457                        }
458                        first_row_start
459                    } else {
460                        let first_row_start = rep.iter().position(|&r| r == max_rep);
461                        items_in_preamble = first_row_start.unwrap_or(rep.len());
462                        first_row_start
463                    };
464                    // It is possible for a chunk to be entirely partial values but if it is then it
465                    // should never show up as a preamble to skip
466                    if first_row_start.is_none() {
467                        assert!(preamble_action == PreambleAction::Take);
468                        return (0..total_items, 0..rep.len() as u64);
469                    }
470                    let first_row_start = first_row_start.unwrap() as u64;
471                    rep = &rep[first_row_start as usize..];
472                    first_row_start
473                }
474                PreambleAction::Absent => {
475                    debug_assert!(rep[0] == max_rep);
476                    0
477                }
478            };
479
480            // We hit this case when all we needed was the preamble
481            if range.start == range.end {
482                debug_assert!(preamble_action == PreambleAction::Take);
483                return (0..items_in_preamble as u64, 0..first_row_start);
484            }
485            assert!(range.start < range.end);
486
487            let mut rows_seen = 0;
488            let mut new_start = 0;
489            let mut new_levels_start = 0;
490
491            if let Some(def) = def {
492                let def = &def.as_ref()[first_row_start as usize..];
493
494                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
495                let mut lead_invis_seen = 0;
496
497                if range.start > 0 {
498                    if def[0] > max_visible_def {
499                        lead_invis_seen += 1;
500                    }
501                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
502                        if *rep == max_rep {
503                            rows_seen += 1;
504                            if rows_seen == range.start {
505                                new_start = idx as u64 + 1 - lead_invis_seen;
506                                new_levels_start = idx as u64 + 1;
507                                break;
508                            }
509                            if *def > max_visible_def {
510                                lead_invis_seen += 1;
511                            }
512                        }
513                    }
514                }
515
516                rows_seen += 1;
517
518                let mut new_end = u64::MAX;
519                let mut new_levels_end = rep.len() as u64;
520                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
521                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
522                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
523                    .iter()
524                    .zip(&def[(new_levels_start + 1) as usize..])
525                    .enumerate()
526                {
527                    if *rep == max_rep {
528                        rows_seen += 1;
529                        if rows_seen == range.end + 1 {
530                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
531                            new_levels_end = idx as u64 + new_levels_start + 1;
532                            break;
533                        }
534                        if *def > max_visible_def {
535                            tail_invis_seen += 1;
536                        }
537                    }
538                }
539
540                if new_end == u64::MAX {
541                    new_levels_end = rep.len() as u64;
542                    // This is the total number of visible items (minus any items in the preamble)
543                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
544                    new_end = rep.len() as u64 - total_invis_seen;
545                }
546
547                assert_ne!(new_end, u64::MAX);
548
549                // Adjust for any skipped preamble
550                if preamble_action == PreambleAction::Skip {
551                    // TODO: Should this be items_in_preamble?  If so, add a
552                    // unit test for this case
553                    new_start += first_row_start;
554                    new_end += first_row_start;
555                    new_levels_start += first_row_start;
556                    new_levels_end += first_row_start;
557                } else if preamble_action == PreambleAction::Take {
558                    debug_assert_eq!(new_start, 0);
559                    debug_assert_eq!(new_levels_start, 0);
560                    new_end += first_row_start;
561                    new_levels_end += first_row_start;
562                }
563
564                (new_start..new_end, new_levels_start..new_levels_end)
565            } else {
566                // Easy case, there are no invisible items, so we don't need to check for them
567                // The items range and levels range will be the same.  We do still need to walk
568                // the rep levels to find the row boundaries
569
570                // range.start == 0 always maps to 0, otherwise we need to walk
571                if range.start > 0 {
572                    for (idx, rep) in rep.iter().skip(1).enumerate() {
573                        if *rep == max_rep {
574                            rows_seen += 1;
575                            if rows_seen == range.start {
576                                new_start = idx as u64 + 1;
577                                break;
578                            }
579                        }
580                    }
581                }
582                let mut new_end = rep.len() as u64;
583                // range.end == max_items always maps to rep.len(), otherwise we need to walk
584                if range.end < total_items {
585                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
586                        if *rep == max_rep {
587                            rows_seen += 1;
588                            if rows_seen == range.end {
589                                new_end = idx as u64 + new_start + 1;
590                                break;
591                            }
592                        }
593                    }
594                }
595
596                // Adjust for any skipped preamble
597                if preamble_action == PreambleAction::Skip {
598                    new_start += first_row_start;
599                    new_end += first_row_start;
600                } else if preamble_action == PreambleAction::Take {
601                    debug_assert_eq!(new_start, 0);
602                    new_end += first_row_start;
603                }
604
605                (new_start..new_end, new_start..new_end)
606            }
607        } else {
608            // No repetition info, easy case, just use the range as-is and the item
609            // and level ranges are the same
610            (range.clone(), range)
611        }
612    }
613
614    // Unwraps a miniblock chunk's "envelope" into the rep, def, and data buffers
615    fn decode_miniblock_chunk(
616        &self,
617        buf: &LanceBuffer,
618        items_in_chunk: u64,
619    ) -> Result<DecodedMiniBlockChunk> {
620        // The first 6 bytes describe the size of the remaining buffers
621        let bytes_rep = u16::from_le_bytes([buf[0], buf[1]]) as usize;
622        let bytes_def = u16::from_le_bytes([buf[2], buf[3]]) as usize;
623        let bytes_val = u16::from_le_bytes([buf[4], buf[5]]) as usize;
624
625        debug_assert!(buf.len() >= bytes_rep + bytes_def + bytes_val + 6);
626        debug_assert!(
627            buf.len()
628                <= bytes_rep
629                                + bytes_def
630                                + bytes_val
631                                + 6
632                                + 1 // P1
633                                + (2 * MINIBLOCK_MAX_PADDING) // P2/P3
634        );
635        let p1 = bytes_rep % 2;
636        let rep = buf.slice_with_length(6, bytes_rep);
637        let def = buf.slice_with_length(6 + bytes_rep + p1, bytes_def);
638        let p2 = pad_bytes::<MINIBLOCK_ALIGNMENT>(6 + bytes_rep + p1 + bytes_def);
639        let values = buf.slice_with_length(6 + bytes_rep + bytes_def + p2, bytes_val);
640
641        let values = self.value_decompressor.decompress(values, items_in_chunk)?;
642
643        let rep = Self::decode_levels(self.rep_decompressor.as_ref(), rep)?;
644        let def = Self::decode_levels(self.def_decompressor.as_ref(), def)?;
645
646        Ok(DecodedMiniBlockChunk { rep, def, values })
647    }
648}
649
650impl DecodePageTask for DecodeMiniBlockTask {
651    fn decode(self: Box<Self>) -> Result<DecodedPage> {
652        // First, we create output buffers for the rep and def and data
653        let mut repbuf: Option<LevelBuffer> = None;
654        let mut defbuf: Option<LevelBuffer> = None;
655
656        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
657
658        // This is probably an over-estimate but it's quick and easy to calculate
659        let estimated_size_bytes = self
660            .instructions
661            .iter()
662            .map(|(_, chunk)| chunk.data.len())
663            .sum::<usize>()
664            * 2;
665        let mut data_builder =
666            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
667
668        // We need to keep track of the offset into repbuf/defbuf that we are building up
669        let mut level_offset = 0;
670        // Now we iterate through each instruction and process it
671        for (instructions, chunk) in self.instructions.iter() {
672            // TODO: It's very possible that we have duplicate `buf` in self.instructions and we
673            // don't want to decode the buf again and again on the same thread.
674
675            let DecodedMiniBlockChunk { rep, def, values } =
676                self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
677
678            // Our instructions tell us which rows we want to take from this chunk
679            let row_range_start =
680                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
681            let row_range_end = row_range_start + instructions.rows_to_take;
682
683            // We use the rep info to map the row range to an item range / levels range
684            let (item_range, level_range) = Self::map_range(
685                row_range_start..row_range_end,
686                rep.as_ref(),
687                def.as_ref(),
688                max_rep,
689                self.max_visible_level,
690                chunk.items_in_chunk,
691                instructions.preamble_action,
692            );
693
694            // Now we append the data to the output buffers
695            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
696            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
697            level_offset += (level_range.end - level_range.start) as usize;
698            data_builder.append(&values, item_range);
699        }
700
701        let data = data_builder.finish();
702
703        let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
704
705        // if dictionary encoding is applied, do dictionary decode here.
706        if let Some(dictionary) = &self.dictionary_data {
707            // assume the indices are uniformly distributed.
708            let estimated_size_bytes = dictionary.data_size()
709                * (data.num_values() + dictionary.num_values() - 1)
710                / dictionary.num_values();
711            let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
712
713            // if dictionary encoding is applied, indices are of type `UInt8`
714            if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
715                let indices = fixed_width_data_block.data.borrow_to_typed_slice::<u8>();
716                let indices = indices.as_ref();
717
718                indices.iter().for_each(|&idx| {
719                    data_builder.append(dictionary, idx as u64..idx as u64 + 1);
720                });
721
722                let data = data_builder.finish();
723                return Ok(DecodedPage {
724                    data,
725                    repdef: unraveler,
726                });
727            }
728        }
729
730        Ok(DecodedPage {
731            data,
732            repdef: unraveler,
733        })
734    }
735}
736
737/// A chunk that has been loaded by the miniblock scheduler (but not
738/// yet decoded)
739#[derive(Debug)]
740struct LoadedChunk {
741    data: LanceBuffer,
742    items_in_chunk: u64,
743    byte_range: Range<u64>,
744    chunk_idx: usize,
745}
746
747impl Clone for LoadedChunk {
748    fn clone(&self) -> Self {
749        Self {
750            // Safe as we always create borrowed buffers here
751            data: self.data.try_clone().unwrap(),
752            items_in_chunk: self.items_in_chunk,
753            byte_range: self.byte_range.clone(),
754            chunk_idx: self.chunk_idx,
755        }
756    }
757}
758
759/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
760/// details on the different layouts.
761#[derive(Debug)]
762struct MiniBlockDecoder {
763    rep_decompressor: Arc<dyn BlockDecompressor>,
764    def_decompressor: Arc<dyn BlockDecompressor>,
765    value_decompressor: Arc<dyn MiniBlockDecompressor>,
766    def_meaning: Arc<[DefinitionInterpretation]>,
767    loaded_chunks: VecDeque<LoadedChunk>,
768    instructions: VecDeque<ChunkInstructions>,
769    offset_in_current_chunk: u64,
770    num_rows: u64,
771    items_per_row: u64,
772    dictionary: Option<Arc<DataBlock>>,
773}
774
775/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
776/// process for miniblock encoded data.
777impl StructuralPageDecoder for MiniBlockDecoder {
778    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
779        let mut items_desired = num_rows * self.items_per_row;
780        let mut need_preamble = false;
781        let mut skip_in_chunk = self.offset_in_current_chunk;
782        let mut drain_instructions = Vec::new();
783        while items_desired > 0 || need_preamble {
784            let (instructions, consumed) = self
785                .instructions
786                .front()
787                .unwrap()
788                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
789
790            while self.loaded_chunks.front().unwrap().chunk_idx
791                != instructions.chunk_instructions.chunk_idx
792            {
793                self.loaded_chunks.pop_front();
794            }
795            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
796            if consumed {
797                self.instructions.pop_front();
798            }
799        }
800        // We can throw away need_preamble here because it must be false.  If it were true it would mean
801        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
802        self.offset_in_current_chunk = skip_in_chunk;
803
804        let max_visible_level = self
805            .def_meaning
806            .iter()
807            .take_while(|l| !l.is_list())
808            .map(|l| l.num_def_levels())
809            .sum::<u16>();
810
811        Ok(Box::new(DecodeMiniBlockTask {
812            instructions: drain_instructions,
813            def_decompressor: self.def_decompressor.clone(),
814            rep_decompressor: self.rep_decompressor.clone(),
815            value_decompressor: self.value_decompressor.clone(),
816            dictionary_data: self.dictionary.clone(),
817            def_meaning: self.def_meaning.clone(),
818            max_visible_level,
819        }))
820    }
821
822    fn num_rows(&self) -> u64 {
823        self.num_rows
824    }
825}
826
827#[derive(Debug)]
828struct CachedComplexAllNullState {
829    rep: Option<ScalarBuffer<u16>>,
830    def: Option<ScalarBuffer<u16>>,
831}
832
833impl DeepSizeOf for CachedComplexAllNullState {
834    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
835        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
836            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
837    }
838}
839
840impl CachedPageData for CachedComplexAllNullState {
841    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
842        self
843    }
844}
845
846/// A scheduler for all-null data that has repetition and definition levels
847///
848/// We still need to do some I/O in this case because we need to figure out what kind of null we
849/// are dealing with (null list, null struct, what level null struct, etc.)
850///
851/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
852/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
853/// and fast so...maybe not :)
854#[derive(Debug)]
855pub struct ComplexAllNullScheduler {
856    // Set from protobuf
857    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
858    def_meaning: Arc<[DefinitionInterpretation]>,
859    items_per_row: u64,
860    repdef: Option<Arc<CachedComplexAllNullState>>,
861}
862
863impl ComplexAllNullScheduler {
864    pub fn new(
865        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
866        def_meaning: Arc<[DefinitionInterpretation]>,
867        items_per_row: u64,
868    ) -> Self {
869        Self {
870            buffer_offsets_and_sizes,
871            def_meaning,
872            items_per_row,
873            repdef: None,
874        }
875    }
876}
877
878impl StructuralPageScheduler for ComplexAllNullScheduler {
879    fn initialize<'a>(
880        &'a mut self,
881        io: &Arc<dyn EncodingsIo>,
882    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
883        // Fully load the rep & def buffers, as needed
884        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
885        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
886        let has_rep = rep_size > 0;
887        let has_def = def_size > 0;
888
889        let mut reads = Vec::with_capacity(2);
890        if has_rep {
891            reads.push(rep_pos..rep_pos + rep_size);
892        }
893        if has_def {
894            reads.push(def_pos..def_pos + def_size);
895        }
896
897        let data = io.submit_request(reads, 0);
898
899        async move {
900            let data = data.await?;
901            let mut data_iter = data.into_iter();
902
903            let rep = if has_rep {
904                let rep = data_iter.next().unwrap();
905                let mut rep = LanceBuffer::from_bytes(rep, 2);
906                let rep = rep.borrow_to_typed_slice::<u16>();
907                Some(rep)
908            } else {
909                None
910            };
911
912            let def = if has_def {
913                let def = data_iter.next().unwrap();
914                let mut def = LanceBuffer::from_bytes(def, 2);
915                let def = def.borrow_to_typed_slice::<u16>();
916                Some(def)
917            } else {
918                None
919            };
920
921            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
922
923            self.repdef = Some(repdef.clone());
924
925            Ok(repdef as Arc<dyn CachedPageData>)
926        }
927        .boxed()
928    }
929
930    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
931        self.repdef = Some(
932            data.clone()
933                .as_arc_any()
934                .downcast::<CachedComplexAllNullState>()
935                .unwrap(),
936        );
937    }
938
939    fn schedule_ranges(
940        &self,
941        ranges: &[Range<u64>],
942        _io: &Arc<dyn EncodingsIo>,
943    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
944        let ranges = VecDeque::from_iter(ranges.iter().cloned());
945        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
946        let item_ranges = ranges
947            .iter()
948            .map(|r| r.start * self.items_per_row..r.end * self.items_per_row)
949            .collect();
950        Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
951            ranges: item_ranges,
952            rep: self.repdef.as_ref().unwrap().rep.clone(),
953            def: self.repdef.as_ref().unwrap().def.clone(),
954            items_per_row: self.items_per_row,
955            num_rows,
956            def_meaning: self.def_meaning.clone(),
957        }) as Box<dyn StructuralPageDecoder>))
958        .boxed())
959    }
960}
961
962#[derive(Debug)]
963pub struct ComplexAllNullPageDecoder {
964    ranges: VecDeque<Range<u64>>,
965    rep: Option<ScalarBuffer<u16>>,
966    def: Option<ScalarBuffer<u16>>,
967    num_rows: u64,
968    items_per_row: u64,
969    def_meaning: Arc<[DefinitionInterpretation]>,
970}
971
972impl ComplexAllNullPageDecoder {
973    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
974        let mut rows_desired = num_rows;
975        let mut ranges = Vec::with_capacity(self.ranges.len());
976        while rows_desired > 0 {
977            let front = self.ranges.front_mut().unwrap();
978            let avail = front.end - front.start;
979            if avail > rows_desired {
980                ranges.push(front.start..front.start + rows_desired);
981                front.start += rows_desired;
982                rows_desired = 0;
983            } else {
984                ranges.push(self.ranges.pop_front().unwrap());
985                rows_desired -= avail;
986            }
987        }
988        ranges
989    }
990}
991
992impl StructuralPageDecoder for ComplexAllNullPageDecoder {
993    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
994        // TODO: This is going to need to be more complicated to deal with nested lists of nulls
995        // because the row ranges might not map directly to item ranges
996        //
997        // We should add test cases and handle this later
998        let num_items = num_rows * self.items_per_row;
999        let drained_ranges = self.drain_ranges(num_items);
1000        Ok(Box::new(DecodeComplexAllNullTask {
1001            ranges: drained_ranges,
1002            rep: self.rep.clone(),
1003            def: self.def.clone(),
1004            def_meaning: self.def_meaning.clone(),
1005        }))
1006    }
1007
1008    fn num_rows(&self) -> u64 {
1009        self.num_rows
1010    }
1011}
1012
1013/// We use `ranges` to slice into `rep` and `def` and create rep/def buffers
1014/// for the null data.
1015#[derive(Debug)]
1016pub struct DecodeComplexAllNullTask {
1017    ranges: Vec<Range<u64>>,
1018    rep: Option<ScalarBuffer<u16>>,
1019    def: Option<ScalarBuffer<u16>>,
1020    def_meaning: Arc<[DefinitionInterpretation]>,
1021}
1022
1023impl DecodeComplexAllNullTask {
1024    fn decode_level(
1025        &self,
1026        levels: &Option<ScalarBuffer<u16>>,
1027        num_values: u64,
1028    ) -> Option<Vec<u16>> {
1029        levels.as_ref().map(|levels| {
1030            let mut referenced_levels = Vec::with_capacity(num_values as usize);
1031            for range in &self.ranges {
1032                referenced_levels.extend(
1033                    levels[range.start as usize..range.end as usize]
1034                        .iter()
1035                        .copied(),
1036                );
1037            }
1038            referenced_levels
1039        })
1040    }
1041}
1042
1043impl DecodePageTask for DecodeComplexAllNullTask {
1044    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1045        let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1046        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1047        let rep = self.decode_level(&self.rep, num_values);
1048        let def = self.decode_level(&self.def, num_values);
1049        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
1050        Ok(DecodedPage {
1051            data,
1052            repdef: unraveler,
1053        })
1054    }
1055}
1056
1057/// A scheduler for simple all-null data
1058///
1059/// "simple" all-null data is data that is all null and only has a single level of definition and
1060/// no repetition.  We don't need to read any data at all in this case.
1061#[derive(Debug, Default)]
1062pub struct SimpleAllNullScheduler {}
1063
1064impl StructuralPageScheduler for SimpleAllNullScheduler {
1065    fn initialize<'a>(
1066        &'a mut self,
1067        _io: &Arc<dyn EncodingsIo>,
1068    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1069        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1070    }
1071
1072    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1073
1074    fn schedule_ranges(
1075        &self,
1076        ranges: &[Range<u64>],
1077        _io: &Arc<dyn EncodingsIo>,
1078    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1079        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1080        Ok(std::future::ready(Ok(
1081            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
1082        ))
1083        .boxed())
1084    }
1085}
1086
1087/// A page decode task for all-null data without any
1088/// repetition and only a single level of definition
1089#[derive(Debug)]
1090struct SimpleAllNullDecodePageTask {
1091    num_values: u64,
1092}
1093impl DecodePageTask for SimpleAllNullDecodePageTask {
1094    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1095        let unraveler = RepDefUnraveler::new(
1096            None,
1097            Some(vec![1; self.num_values as usize]),
1098            Arc::new([DefinitionInterpretation::NullableItem]),
1099        );
1100        Ok(DecodedPage {
1101            data: DataBlock::AllNull(AllNullDataBlock {
1102                num_values: self.num_values,
1103            }),
1104            repdef: unraveler,
1105        })
1106    }
1107}
1108
1109#[derive(Debug)]
1110pub struct SimpleAllNullPageDecoder {
1111    num_rows: u64,
1112}
1113
1114impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1115    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1116        Ok(Box::new(SimpleAllNullDecodePageTask {
1117            num_values: num_rows,
1118        }))
1119    }
1120
1121    fn num_rows(&self) -> u64 {
1122        self.num_rows
1123    }
1124}
1125
1126#[derive(Debug, Clone)]
1127struct MiniBlockSchedulerDictionary {
1128    // These come from the protobuf
1129    dictionary_decompressor: Arc<dyn BlockDecompressor>,
1130    dictionary_buf_position_and_size: (u64, u64),
1131    dictionary_data_alignment: u64,
1132}
1133
1134#[derive(Debug)]
1135struct RepIndexBlock {
1136    // The index of the first row that starts after the beginning of this block.  If the block
1137    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
1138    // then this will be a row that starts in some future block.
1139    first_row: u64,
1140    // The number of rows in the block, including the trailer but not the preamble.
1141    // Can be 0 if the block is entirely preamble
1142    starts_including_trailer: u64,
1143    // Whether the block has a preamble
1144    has_preamble: bool,
1145    // Whether the block has a trailer
1146    has_trailer: bool,
1147}
1148
1149impl DeepSizeOf for RepIndexBlock {
1150    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1151        0
1152    }
1153}
1154
1155#[derive(Debug)]
1156struct RepetitionIndex {
1157    blocks: Vec<RepIndexBlock>,
1158}
1159
1160impl DeepSizeOf for RepetitionIndex {
1161    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1162        self.blocks.deep_size_of_children(context)
1163    }
1164}
1165
1166impl RepetitionIndex {
1167    fn decode(rep_index: &[Vec<u64>]) -> Self {
1168        let mut chunk_has_preamble = false;
1169        let mut offset = 0;
1170        let mut blocks = Vec::with_capacity(rep_index.len());
1171        for chunk_rep in rep_index {
1172            let ends_count = chunk_rep[0];
1173            let partial_count = chunk_rep[1];
1174
1175            let chunk_has_trailer = partial_count > 0;
1176            let mut starts_including_trailer = ends_count;
1177            if chunk_has_trailer {
1178                starts_including_trailer += 1;
1179            }
1180            if chunk_has_preamble {
1181                starts_including_trailer -= 1;
1182            }
1183
1184            blocks.push(RepIndexBlock {
1185                first_row: offset,
1186                starts_including_trailer,
1187                has_preamble: chunk_has_preamble,
1188                has_trailer: chunk_has_trailer,
1189            });
1190
1191            chunk_has_preamble = chunk_has_trailer;
1192            offset += starts_including_trailer;
1193        }
1194
1195        Self { blocks }
1196    }
1197}
1198
1199/// State that is loaded once and cached for future lookups
1200#[derive(Debug)]
1201struct MiniBlockCacheableState {
1202    /// Metadata that describes each chunk in the page
1203    chunk_meta: Vec<ChunkMeta>,
1204    /// The decoded repetition index
1205    rep_index: RepetitionIndex,
1206    /// The dictionary for the page, if any
1207    dictionary: Option<Arc<DataBlock>>,
1208}
1209
1210impl DeepSizeOf for MiniBlockCacheableState {
1211    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1212        self.rep_index.deep_size_of_children(context)
1213            + self
1214                .dictionary
1215                .as_ref()
1216                .map(|dict| dict.data_size() as usize)
1217                .unwrap_or(0)
1218    }
1219}
1220
1221impl CachedPageData for MiniBlockCacheableState {
1222    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1223        self
1224    }
1225}
1226
1227/// A scheduler for a page that has been encoded with the mini-block layout
1228///
1229/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1230/// in practice.
1231///
1232/// First, during initialization, we load the chunk metadata, the repetition index,
1233/// and the dictionary (these last two may not be present)
1234///
1235/// Then, during scheduling, we use the user's requested row ranges and the repetition
1236/// index to determine which chunks we need and which rows we need from those chunks.
1237///
1238/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1239/// from the user is 40..60 then we need to:
1240///
1241///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1242///    then read 3 items for the 11th row of our range.
1243///  - Read the second chunk and read the remaining items in our 11th row and then read
1244///    the remaining 9 full rows.
1245///
1246/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1247/// The first two decode tasks will just need the first chunk.  The third decode task will
1248/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1249/// chunk.  The final decode task will just need the second chunk.
1250///
1251/// The above prose descriptions are what are represented by [`ChunkInstructions`] and
1252/// [`ChunkDrainInstructions`].
1253#[derive(Debug)]
1254pub struct MiniBlockScheduler {
1255    // These come from the protobuf
1256    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1257    priority: u64,
1258    items_in_page: u64,
1259    items_per_row: u64,
1260    repetition_index_depth: u16,
1261    rep_decompressor: Arc<dyn BlockDecompressor>,
1262    def_decompressor: Arc<dyn BlockDecompressor>,
1263    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1264    def_meaning: Arc<[DefinitionInterpretation]>,
1265    dictionary: Option<MiniBlockSchedulerDictionary>,
1266    // This is set after initialization
1267    page_meta: Option<Arc<MiniBlockCacheableState>>,
1268}
1269
1270impl MiniBlockScheduler {
1271    fn try_new(
1272        buffer_offsets_and_sizes: &[(u64, u64)],
1273        priority: u64,
1274        items_in_page: u64,
1275        items_per_row: u64,
1276        layout: &pb::MiniBlockLayout,
1277        decompressors: &dyn DecompressorStrategy,
1278    ) -> Result<Self> {
1279        let rep_decompressor =
1280            decompressors.create_block_decompressor(layout.rep_compression.as_ref().unwrap())?;
1281        let def_decompressor =
1282            decompressors.create_block_decompressor(layout.def_compression.as_ref().unwrap())?;
1283        let def_meaning = layout
1284            .layers
1285            .iter()
1286            .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1287            .collect::<Vec<_>>();
1288        let value_decompressor = decompressors
1289            .create_miniblock_decompressor(layout.value_compression.as_ref().unwrap())?;
1290        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1291            match dictionary_encoding.array_encoding.as_ref().unwrap() {
1292                pb::array_encoding::ArrayEncoding::Variable(_) => {
1293                    Some(MiniBlockSchedulerDictionary {
1294                        dictionary_decompressor: decompressors
1295                            .create_block_decompressor(dictionary_encoding)?
1296                            .into(),
1297                        dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1298                        dictionary_data_alignment: 4,
1299                    })
1300                }
1301                pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
1302                    dictionary_decompressor: decompressors
1303                        .create_block_decompressor(dictionary_encoding)?
1304                        .into(),
1305                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1306                    dictionary_data_alignment: 16,
1307                }),
1308                _ => {
1309                    unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1310                }
1311            }
1312        } else {
1313            None
1314        };
1315
1316        Ok(Self {
1317            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1318            rep_decompressor: rep_decompressor.into(),
1319            def_decompressor: def_decompressor.into(),
1320            value_decompressor: value_decompressor.into(),
1321            repetition_index_depth: layout.repetition_index_depth as u16,
1322            priority,
1323            items_in_page,
1324            items_per_row,
1325            dictionary,
1326            def_meaning: def_meaning.into(),
1327            page_meta: None,
1328        })
1329    }
1330
1331    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1332        let page_meta = self.page_meta.as_ref().unwrap();
1333        chunk_indices
1334            .iter()
1335            .map(|&chunk_idx| {
1336                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1337                let bytes_start = chunk_meta.offset_bytes;
1338                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1339                LoadedChunk {
1340                    byte_range: bytes_start..bytes_end,
1341                    items_in_chunk: chunk_meta.num_values,
1342                    chunk_idx,
1343                    data: LanceBuffer::empty(),
1344                }
1345            })
1346            .collect()
1347    }
1348}
1349
1350#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1351enum PreambleAction {
1352    Take,
1353    Skip,
1354    Absent,
1355}
1356
1357// TODO: Add test cases for the all-preamble and all-trailer cases
1358
1359// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1360// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1361// us how exactly to read from the chunk.
1362#[derive(Clone, Debug, PartialEq, Eq)]
1363struct ChunkInstructions {
1364    // The index of the chunk to read
1365    chunk_idx: usize,
1366    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1367    // is no repetition index there is never a preamble.
1368    //
1369    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1370    // that spans several chunks.
1371    preamble: PreambleAction,
1372    // How many complete rows (not including the preamble or trailer) to skip
1373    //
1374    // If this is non-zero then premable must not be Take
1375    rows_to_skip: u64,
1376    // How many complete (non-preamble / non-trailer) rows to take
1377    rows_to_take: u64,
1378    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1379    // never a trailer.
1380    //
1381    // It's possible for a chunk to be entirely trailer.  This would mean the chunk starts with the beginning
1382    // of a list and that list is continued in the next chunk.
1383    //
1384    // If this is true then we want to include the trailer
1385    take_trailer: bool,
1386}
1387
1388// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1389// start decoding them, based on a batch size, which might not align with what we scheduled.
1390//
1391// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1392//
1393// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1394// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1395// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1396// then take 10 rows.
1397//
1398// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1399//  -no preamble, skip 5, take 10, take trailer
1400//  -take preamble, skip 0, take 50, no trailer
1401//
1402// and we are draining 20 rows then the drain instructions for the first batch will be:
1403//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1404//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1405#[derive(Debug, PartialEq, Eq)]
1406struct ChunkDrainInstructions {
1407    chunk_instructions: ChunkInstructions,
1408    rows_to_skip: u64,
1409    rows_to_take: u64,
1410    preamble_action: PreambleAction,
1411}
1412
1413impl ChunkInstructions {
1414    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1415    //
1416    // We assume that `user_ranges` are in sorted order and non-overlapping
1417    //
1418    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1419    fn schedule_instructions(rep_index: &RepetitionIndex, user_ranges: &[Range<u64>]) -> Vec<Self> {
1420        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1421        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1422        // per row which can happen with lists.
1423        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1424
1425        for user_range in user_ranges {
1426            let mut rows_needed = user_range.end - user_range.start;
1427            let mut need_preamble = false;
1428
1429            // Need to find the first chunk with a first row >= user_range.start.  If there are
1430            // multiple chunks with the same first row we need to take the first one.
1431            let mut block_index = match rep_index
1432                .blocks
1433                .binary_search_by_key(&user_range.start, |block| block.first_row)
1434            {
1435                Ok(idx) => {
1436                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1437                    // are grabbing first eligible chunk
1438                    let mut idx = idx;
1439                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1440                        idx -= 1;
1441                    }
1442                    idx
1443                }
1444                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1445                Err(idx) => idx - 1,
1446            };
1447
1448            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1449
1450            while rows_needed > 0 || need_preamble {
1451                let chunk = &rep_index.blocks[block_index];
1452                let rows_avail = chunk.starts_including_trailer - to_skip;
1453                debug_assert!(rows_avail > 0);
1454
1455                let rows_to_take = rows_avail.min(rows_needed);
1456                rows_needed -= rows_to_take;
1457
1458                let mut take_trailer = false;
1459                let preamble = if chunk.has_preamble {
1460                    if need_preamble {
1461                        PreambleAction::Take
1462                    } else {
1463                        PreambleAction::Skip
1464                    }
1465                } else {
1466                    PreambleAction::Absent
1467                };
1468                let mut rows_to_take_no_trailer = rows_to_take;
1469
1470                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1471                if rows_to_take == rows_avail && chunk.has_trailer {
1472                    take_trailer = true;
1473                    need_preamble = true;
1474                    rows_to_take_no_trailer -= 1;
1475                } else {
1476                    need_preamble = false;
1477                };
1478
1479                chunk_instructions.push(Self {
1480                    preamble,
1481                    chunk_idx: block_index,
1482                    rows_to_skip: to_skip,
1483                    rows_to_take: rows_to_take_no_trailer,
1484                    take_trailer,
1485                });
1486
1487                to_skip = 0;
1488                block_index += 1;
1489            }
1490        }
1491
1492        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1493        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1494        // rows of chunk 0 starting at 0")
1495        if user_ranges.len() > 1 {
1496            // TODO: Could probably optimize this allocation away
1497            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1498            let mut instructions_iter = chunk_instructions.into_iter();
1499            merged_instructions.push(instructions_iter.next().unwrap());
1500            for instruction in instructions_iter {
1501                let last = merged_instructions.last_mut().unwrap();
1502                if last.chunk_idx == instruction.chunk_idx
1503                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1504                {
1505                    last.rows_to_take += instruction.rows_to_take;
1506                    last.take_trailer |= instruction.take_trailer;
1507                } else {
1508                    merged_instructions.push(instruction);
1509                }
1510            }
1511            merged_instructions
1512        } else {
1513            chunk_instructions
1514        }
1515    }
1516
1517    fn drain_from_instruction(
1518        &self,
1519        rows_desired: &mut u64,
1520        need_preamble: &mut bool,
1521        skip_in_chunk: &mut u64,
1522    ) -> (ChunkDrainInstructions, bool) {
1523        // If we need the premable then we shouldn't be skipping anything
1524        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1525        let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1526        let has_preamble = self.preamble != PreambleAction::Absent;
1527        let preamble_action = match (*need_preamble, has_preamble) {
1528            (true, true) => PreambleAction::Take,
1529            (true, false) => panic!("Need preamble but there isn't one"),
1530            (false, true) => PreambleAction::Skip,
1531            (false, false) => PreambleAction::Absent,
1532        };
1533
1534        // Did the scheduled chunk have a trailer?  If so, we have one extra row available
1535        if self.take_trailer {
1536            rows_avail += 1;
1537        }
1538
1539        // How many rows are we actually taking in this take step (including the preamble
1540        // and trailer both as individual rows)
1541        let rows_taking = if *rows_desired >= rows_avail {
1542            // We want all the rows.  If there is a trailer we are grabbing it and will need
1543            // the preamble of the next chunk
1544            *need_preamble = self.take_trailer;
1545            rows_avail
1546        } else {
1547            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1548            // it so we will not need the preamble
1549            *need_preamble = false;
1550            *rows_desired
1551        };
1552        let rows_skipped = *skip_in_chunk;
1553
1554        // Update the state for the next iteration
1555        let consumed_chunk = if *rows_desired >= rows_avail {
1556            *rows_desired -= rows_avail;
1557            *skip_in_chunk = 0;
1558            true
1559        } else {
1560            *skip_in_chunk += *rows_desired;
1561            *rows_desired = 0;
1562            false
1563        };
1564
1565        (
1566            ChunkDrainInstructions {
1567                chunk_instructions: self.clone(),
1568                rows_to_skip: rows_skipped,
1569                rows_to_take: rows_taking,
1570                preamble_action,
1571            },
1572            consumed_chunk,
1573        )
1574    }
1575}
1576
1577impl StructuralPageScheduler for MiniBlockScheduler {
1578    fn initialize<'a>(
1579        &'a mut self,
1580        io: &Arc<dyn EncodingsIo>,
1581    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1582        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1583        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1584        // need.
1585        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1586        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1587        let mut bufs_needed = 1;
1588        if self.dictionary.is_some() {
1589            bufs_needed += 1;
1590        }
1591        if self.repetition_index_depth > 0 {
1592            bufs_needed += 1;
1593        }
1594        let mut required_ranges = Vec::with_capacity(bufs_needed);
1595        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1596        if let Some(ref dictionary) = self.dictionary {
1597            required_ranges.push(
1598                dictionary.dictionary_buf_position_and_size.0
1599                    ..dictionary.dictionary_buf_position_and_size.0
1600                        + dictionary.dictionary_buf_position_and_size.1,
1601            );
1602        }
1603        if self.repetition_index_depth > 0 {
1604            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1605            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1606        }
1607        let io_req = io.submit_request(required_ranges, 0);
1608
1609        async move {
1610            let mut buffers = io_req.await?.into_iter().fuse();
1611            let meta_bytes = buffers.next().unwrap();
1612            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1613            let rep_index_bytes = buffers.next();
1614
1615            // Parse the metadata and build the chunk meta
1616            assert!(meta_bytes.len() % 2 == 0);
1617            let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1618            let words = bytes.borrow_to_typed_slice::<u16>();
1619            let words = words.as_ref();
1620
1621            let mut chunk_meta = Vec::with_capacity(words.len());
1622
1623            let mut rows_counter = 0;
1624            let mut offset_bytes = value_buf_position;
1625            for (word_idx, word) in words.iter().enumerate() {
1626                let log_num_values = word & 0x0F;
1627                let divided_bytes = word >> 4;
1628                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1629                debug_assert!(num_bytes > 0);
1630                let num_values = if word_idx < words.len() - 1 {
1631                    debug_assert!(log_num_values > 0);
1632                    1 << log_num_values
1633                } else {
1634                    debug_assert_eq!(log_num_values, 0);
1635                    self.items_in_page - rows_counter
1636                };
1637                rows_counter += num_values;
1638
1639                chunk_meta.push(ChunkMeta {
1640                    num_values,
1641                    chunk_size_bytes: num_bytes as u64,
1642                    offset_bytes,
1643                });
1644                offset_bytes += num_bytes as u64;
1645            }
1646
1647            // Build the repetition index
1648            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1649                // If we have a repetition index then we use that
1650                // TODO: Compress the repetition index :)
1651                assert!(rep_index_data.len() % 8 == 0);
1652                let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
1653                let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
1654                // Unflatten
1655                repetition_index_vals
1656                    .as_ref()
1657                    .chunks_exact(self.repetition_index_depth as usize + 1)
1658                    .map(|c| c.to_vec())
1659                    .collect::<Vec<_>>()
1660            } else {
1661                // Default rep index is just the number of items in each chunk
1662                // with 0 partials/leftovers
1663                chunk_meta
1664                    .iter()
1665                    .map(|c| vec![c.num_values, 0])
1666                    .collect::<Vec<_>>()
1667            };
1668
1669            let mut page_meta = MiniBlockCacheableState {
1670                chunk_meta,
1671                rep_index: RepetitionIndex::decode(&rep_index),
1672                dictionary: None,
1673            };
1674
1675            // decode dictionary
1676            if let Some(ref mut dictionary) = self.dictionary {
1677                let dictionary_data = dictionary_bytes.unwrap();
1678                page_meta.dictionary =
1679                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1680                        LanceBuffer::from_bytes(
1681                            dictionary_data,
1682                            dictionary.dictionary_data_alignment,
1683                        ),
1684                    )?));
1685            };
1686            let page_meta = Arc::new(page_meta);
1687            self.page_meta = Some(page_meta.clone());
1688            Ok(page_meta as Arc<dyn CachedPageData>)
1689        }
1690        .boxed()
1691    }
1692
1693    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1694        self.page_meta = Some(
1695            data.clone()
1696                .as_arc_any()
1697                .downcast::<MiniBlockCacheableState>()
1698                .unwrap(),
1699        );
1700    }
1701
1702    fn schedule_ranges(
1703        &self,
1704        ranges: &[Range<u64>],
1705        io: &Arc<dyn EncodingsIo>,
1706    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1707        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1708        let ranges = ranges
1709            .iter()
1710            .map(|r| r.start * self.items_per_row..r.end * self.items_per_row)
1711            .collect::<Vec<_>>();
1712
1713        let page_meta = self.page_meta.as_ref().unwrap();
1714
1715        let chunk_instructions =
1716            ChunkInstructions::schedule_instructions(&page_meta.rep_index, &ranges);
1717
1718        debug_assert_eq!(
1719            num_rows * self.items_per_row,
1720            chunk_instructions
1721                .iter()
1722                .map(|ci| {
1723                    let taken = ci.rows_to_take;
1724                    if ci.take_trailer {
1725                        taken + 1
1726                    } else {
1727                        taken
1728                    }
1729                })
1730                .sum::<u64>()
1731        );
1732
1733        let chunks_needed = chunk_instructions
1734            .iter()
1735            .map(|ci| ci.chunk_idx)
1736            .unique()
1737            .collect::<Vec<_>>();
1738        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1739        let chunk_ranges = loaded_chunks
1740            .iter()
1741            .map(|c| c.byte_range.clone())
1742            .collect::<Vec<_>>();
1743        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1744
1745        let rep_decompressor = self.rep_decompressor.clone();
1746        let def_decompressor = self.def_decompressor.clone();
1747        let value_decompressor = self.value_decompressor.clone();
1748        let dictionary = page_meta
1749            .dictionary
1750            .as_ref()
1751            .map(|dictionary| dictionary.clone());
1752        let def_meaning = self.def_meaning.clone();
1753        let items_per_row = self.items_per_row;
1754
1755        let res = async move {
1756            let loaded_chunk_data = loaded_chunk_data.await?;
1757            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1758                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1759            }
1760
1761            Ok(Box::new(MiniBlockDecoder {
1762                rep_decompressor,
1763                def_decompressor,
1764                value_decompressor,
1765                def_meaning,
1766                loaded_chunks: VecDeque::from_iter(loaded_chunks),
1767                instructions: VecDeque::from(chunk_instructions),
1768                offset_in_current_chunk: 0,
1769                dictionary,
1770                num_rows,
1771                items_per_row,
1772            }) as Box<dyn StructuralPageDecoder>)
1773        }
1774        .boxed();
1775        Ok(res)
1776    }
1777}
1778
1779#[derive(Debug)]
1780struct FullZipRepIndexDetails {
1781    buf_position: u64,
1782    bytes_per_value: u64, // Will be 1, 2, 4, or 8
1783}
1784
1785#[derive(Debug)]
1786enum PerValueDecompressor {
1787    Fixed(Arc<dyn FixedPerValueDecompressor>),
1788    Variable(Arc<dyn VariablePerValueDecompressor>),
1789}
1790
1791#[derive(Debug)]
1792struct FullZipDecodeDetails {
1793    value_decompressor: PerValueDecompressor,
1794    def_meaning: Arc<[DefinitionInterpretation]>,
1795    ctrl_word_parser: ControlWordParser,
1796    max_rep: u16,
1797    max_visible_def: u16,
1798    items_per_row: u64,
1799}
1800
1801/// A scheduler for full-zip encoded data
1802///
1803/// When the data type has a fixed-width then we simply need to map from
1804/// row ranges to byte ranges using the fixed-width of the data type.
1805///
1806/// When the data type is variable-width or has any repetition then a
1807/// repetition index is required.
1808#[derive(Debug)]
1809pub struct FullZipScheduler {
1810    data_buf_position: u64,
1811    rep_index: Option<FullZipRepIndexDetails>,
1812    priority: u64,
1813    rows_in_page: u64,
1814    bits_per_offset: u8,
1815    details: Arc<FullZipDecodeDetails>,
1816}
1817
1818impl FullZipScheduler {
1819    fn try_new(
1820        buffer_offsets_and_sizes: &[(u64, u64)],
1821        priority: u64,
1822        rows_in_page: u64,
1823        items_per_row: u64,
1824        layout: &pb::FullZipLayout,
1825        decompressors: &dyn DecompressorStrategy,
1826        bits_per_offset: u8,
1827    ) -> Result<Self> {
1828        // We don't need the data_buf_size because either the data type is
1829        // fixed-width (and we can tell size from rows_in_page) or it is not
1830        // and we have a repetition index.
1831        let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1832        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1833            let num_reps = (items_per_row * rows_in_page) + 1;
1834            let bytes_per_rep = len / num_reps;
1835            debug_assert_eq!(len % num_reps, 0);
1836            debug_assert!(
1837                bytes_per_rep == 1
1838                    || bytes_per_rep == 2
1839                    || bytes_per_rep == 4
1840                    || bytes_per_rep == 8
1841            );
1842            FullZipRepIndexDetails {
1843                buf_position: *pos,
1844                bytes_per_value: bytes_per_rep,
1845            }
1846        });
1847
1848        let value_decompressor = match layout.details {
1849            Some(pb::full_zip_layout::Details::BitsPerValue(_)) => {
1850                let decompressor = decompressors.create_fixed_per_value_decompressor(
1851                    layout.value_compression.as_ref().unwrap(),
1852                )?;
1853                PerValueDecompressor::Fixed(decompressor.into())
1854            }
1855            Some(pb::full_zip_layout::Details::BitsPerOffset(_)) => {
1856                let decompressor = decompressors.create_variable_per_value_decompressor(
1857                    layout.value_compression.as_ref().unwrap(),
1858                )?;
1859                PerValueDecompressor::Variable(decompressor.into())
1860            }
1861            None => {
1862                panic!("Full-zip layout must have a `details` field");
1863            }
1864        };
1865        let ctrl_word_parser = ControlWordParser::new(
1866            layout.bits_rep.try_into().unwrap(),
1867            layout.bits_def.try_into().unwrap(),
1868        );
1869        let def_meaning = layout
1870            .layers
1871            .iter()
1872            .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1873            .collect::<Vec<_>>();
1874
1875        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1876        let max_visible_def = def_meaning
1877            .iter()
1878            .filter(|d| !d.is_list())
1879            .map(|d| d.num_def_levels())
1880            .sum();
1881
1882        let details = Arc::new(FullZipDecodeDetails {
1883            value_decompressor,
1884            def_meaning: def_meaning.into(),
1885            ctrl_word_parser,
1886            items_per_row,
1887            max_rep,
1888            max_visible_def,
1889        });
1890        Ok(Self {
1891            data_buf_position,
1892            rep_index,
1893            details,
1894            priority,
1895            rows_in_page,
1896            bits_per_offset,
1897        })
1898    }
1899
1900    /// Schedules indirectly by first fetching the data ranges from the
1901    /// repetition index and then fetching the data
1902    ///
1903    /// This approach is needed whenever we have a repetition index and
1904    /// the data has a variable length.
1905    #[allow(clippy::too_many_arguments)]
1906    async fn indirect_schedule_ranges(
1907        data_buffer_pos: u64,
1908        item_ranges: Vec<Range<u64>>,
1909        rep_index_ranges: Vec<Range<u64>>,
1910        bytes_per_rep: u64,
1911        io: Arc<dyn EncodingsIo>,
1912        priority: u64,
1913        bits_per_offset: u8,
1914        details: Arc<FullZipDecodeDetails>,
1915    ) -> Result<Box<dyn StructuralPageDecoder>> {
1916        let byte_ranges = io
1917            .submit_request(rep_index_ranges, priority)
1918            .await?
1919            .into_iter()
1920            .map(|d| LanceBuffer::from_bytes(d, 1))
1921            .collect::<Vec<_>>();
1922        let byte_ranges = LanceBuffer::concat(&byte_ranges);
1923        let byte_ranges = ByteUnpacker::new(byte_ranges, bytes_per_rep as usize)
1924            .chunks(2)
1925            .into_iter()
1926            .map(|mut c| {
1927                let start = c.next().unwrap() + data_buffer_pos;
1928                let end = c.next().unwrap() + data_buffer_pos;
1929                start..end
1930            })
1931            .collect::<Vec<_>>();
1932
1933        let data = io.submit_request(byte_ranges, priority);
1934
1935        let data = data.await?;
1936        let data = data
1937            .into_iter()
1938            .map(|d| LanceBuffer::from_bytes(d, 1))
1939            .collect();
1940        let num_rows = item_ranges.into_iter().map(|r| r.end - r.start).sum();
1941
1942        match &details.value_decompressor {
1943            PerValueDecompressor::Fixed(decompressor) => {
1944                let bits_per_value = decompressor.bits_per_value();
1945                assert!(bits_per_value > 0);
1946                if bits_per_value % 8 != 0 {
1947                    // Unlikely we will ever want this since full-zip values are so large the few bits we shave off don't
1948                    // make much difference.
1949                    unimplemented!("Bit-packed full-zip");
1950                }
1951                let bytes_per_value = bits_per_value / 8;
1952                let total_bytes_per_value =
1953                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1954                Ok(Box::new(FixedFullZipDecoder {
1955                    details,
1956                    data,
1957                    num_rows,
1958                    offset_in_current: 0,
1959                    bytes_per_value: bytes_per_value as usize,
1960                    total_bytes_per_value,
1961                }) as Box<dyn StructuralPageDecoder>)
1962            }
1963            PerValueDecompressor::Variable(_decompressor) => {
1964                // Variable full-zip
1965
1966                Ok(Box::new(VariableFullZipDecoder::new(
1967                    details,
1968                    data,
1969                    num_rows,
1970                    bits_per_offset,
1971                    bits_per_offset,
1972                )))
1973            }
1974        }
1975    }
1976
1977    /// Schedules ranges in the presence of a repetition index
1978    fn schedule_ranges_rep(
1979        &self,
1980        ranges: &[Range<u64>],
1981        io: &Arc<dyn EncodingsIo>,
1982        rep_index: &FullZipRepIndexDetails,
1983    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1984        // Convert row ranges to item ranges (i.e. multiply by items per row)
1985        let item_ranges = ranges
1986            .iter()
1987            .map(|r| r.start * self.details.items_per_row..r.end * self.details.items_per_row)
1988            .collect::<Vec<_>>();
1989
1990        let rep_index_ranges = item_ranges
1991            .iter()
1992            .flat_map(|r| {
1993                let first_val_start =
1994                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
1995                let first_val_end = first_val_start + rep_index.bytes_per_value;
1996                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
1997                let last_val_end = last_val_start + rep_index.bytes_per_value;
1998                [first_val_start..first_val_end, last_val_start..last_val_end]
1999            })
2000            .collect::<Vec<_>>();
2001
2002        // Create the decoder
2003
2004        Ok(Self::indirect_schedule_ranges(
2005            self.data_buf_position,
2006            item_ranges,
2007            rep_index_ranges,
2008            rep_index.bytes_per_value,
2009            io.clone(),
2010            self.priority,
2011            self.bits_per_offset,
2012            self.details.clone(),
2013        )
2014        .boxed())
2015    }
2016
2017    // In the simple case there is no repetition and we just have large fixed-width
2018    // rows of data.  We can just map row ranges to byte ranges directly using the
2019    // fixed-width of the data type.
2020    fn schedule_ranges_simple(
2021        &self,
2022        ranges: &[Range<u64>],
2023        io: &dyn EncodingsIo,
2024    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2025        // Convert row ranges to item ranges (i.e. multiply by items per row)
2026        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2027        let item_ranges = ranges
2028            .iter()
2029            .map(|r| r.start * self.details.items_per_row..r.end * self.details.items_per_row)
2030            .collect::<Vec<_>>();
2031
2032        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2033            unreachable!()
2034        };
2035
2036        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
2037        let bits_per_value = decompressor.bits_per_value();
2038        assert_eq!(bits_per_value % 8, 0);
2039        let bytes_per_value = bits_per_value / 8;
2040        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2041        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2042        let byte_ranges = item_ranges.iter().map(|r| {
2043            debug_assert!(r.end <= self.rows_in_page * self.details.items_per_row);
2044            let start = self.data_buf_position + r.start * total_bytes_per_value;
2045            let end = self.data_buf_position + r.end * total_bytes_per_value;
2046            start..end
2047        });
2048
2049        // Request byte ranges
2050        let data = io.submit_request(byte_ranges.collect(), self.priority);
2051
2052        let details = self.details.clone();
2053
2054        Ok(async move {
2055            let data = data.await?;
2056            let data = data
2057                .into_iter()
2058                .map(|d| LanceBuffer::from_bytes(d, 1))
2059                .collect();
2060            Ok(Box::new(FixedFullZipDecoder {
2061                details,
2062                data,
2063                num_rows,
2064                offset_in_current: 0,
2065                bytes_per_value: bytes_per_value as usize,
2066                total_bytes_per_value: total_bytes_per_value as usize,
2067            }) as Box<dyn StructuralPageDecoder>)
2068        }
2069        .boxed())
2070    }
2071}
2072
2073impl StructuralPageScheduler for FullZipScheduler {
2074    // TODO: Add opt-in caching of repetition index
2075    fn initialize<'a>(
2076        &'a mut self,
2077        _io: &Arc<dyn EncodingsIo>,
2078    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2079        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2080    }
2081
2082    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
2083
2084    fn schedule_ranges(
2085        &self,
2086        ranges: &[Range<u64>],
2087        io: &Arc<dyn EncodingsIo>,
2088    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2089        if let Some(rep_index) = self.rep_index.as_ref() {
2090            self.schedule_ranges_rep(ranges, io, rep_index)
2091        } else {
2092            self.schedule_ranges_simple(ranges, io.as_ref())
2093        }
2094    }
2095}
2096
2097/// A decoder for full-zip encoded data when the data has a fixed-width
2098///
2099/// Here we need to unzip the control words from the values themselves and
2100/// then decompress the requested values.
2101///
2102/// We use a PerValueDecompressor because we will only be decompressing the
2103/// requested data.  This decoder / scheduler does not do any read amplification.
2104#[derive(Debug)]
2105struct FixedFullZipDecoder {
2106    details: Arc<FullZipDecodeDetails>,
2107    data: VecDeque<LanceBuffer>,
2108    offset_in_current: usize,
2109    bytes_per_value: usize,
2110    total_bytes_per_value: usize,
2111    num_rows: u64,
2112}
2113
2114impl FixedFullZipDecoder {
2115    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2116        debug_assert!(num_rows > 0);
2117        let cur_buf = self.data.front_mut().unwrap();
2118        let start = self.offset_in_current;
2119        if self.details.ctrl_word_parser.has_rep() {
2120            // This is a slightly slower path.  In order to figure out where to split we need to
2121            // examine the rep index so we can convert num_lists to num_rows
2122            let mut rows_started = 0;
2123            // We always need at least one value.  Now loop through until we have passed num_rows
2124            // values
2125            let mut num_items = 0;
2126            while self.offset_in_current < cur_buf.len() {
2127                let control = self.details.ctrl_word_parser.parse_desc(
2128                    &cur_buf[self.offset_in_current..],
2129                    self.details.max_rep,
2130                    self.details.max_visible_def,
2131                );
2132                if control.is_new_row {
2133                    if rows_started == num_rows {
2134                        break;
2135                    }
2136                    rows_started += 1;
2137                }
2138                num_items += 1;
2139                if control.is_visible {
2140                    self.offset_in_current += self.total_bytes_per_value;
2141                } else {
2142                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2143                }
2144            }
2145
2146            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2147            if self.offset_in_current == cur_buf.len() {
2148                self.data.pop_front();
2149                self.offset_in_current = 0;
2150            }
2151
2152            FullZipDecodeTaskItem {
2153                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2154                    data: task_slice,
2155                    bits_per_value: self.bytes_per_value as u64 * 8,
2156                    num_values: num_items,
2157                    block_info: BlockInfo::new(),
2158                }),
2159                rows_in_buf: rows_started,
2160                items_in_buf: num_items,
2161            }
2162        } else {
2163            // If there's no repetition we can calculate the slicing point by just multiplying
2164            // the number of rows by the total bytes per value
2165            let cur_buf = self.data.front_mut().unwrap();
2166            let bytes_avail = cur_buf.len() - self.offset_in_current;
2167            let offset_in_cur = self.offset_in_current;
2168
2169            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2170            let mut rows_taken = num_rows;
2171            let task_slice = if bytes_needed >= bytes_avail {
2172                self.offset_in_current = 0;
2173                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2174                self.data
2175                    .pop_front()
2176                    .unwrap()
2177                    .slice_with_length(offset_in_cur, bytes_avail)
2178            } else {
2179                self.offset_in_current += bytes_needed;
2180                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2181            };
2182            FullZipDecodeTaskItem {
2183                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2184                    data: task_slice,
2185                    bits_per_value: self.bytes_per_value as u64 * 8,
2186                    num_values: rows_taken,
2187                    block_info: BlockInfo::new(),
2188                }),
2189                rows_in_buf: rows_taken,
2190                items_in_buf: rows_taken,
2191            }
2192        }
2193    }
2194}
2195
2196impl StructuralPageDecoder for FixedFullZipDecoder {
2197    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2198        let mut task_data = Vec::with_capacity(self.data.len());
2199        let mut remaining = num_rows * self.details.items_per_row;
2200        while remaining > 0 {
2201            let task_item = self.slice_next_task(remaining);
2202            remaining -= task_item.rows_in_buf;
2203            task_data.push(task_item);
2204        }
2205        let num_items = task_data.iter().map(|td| td.items_in_buf).sum::<u64>() as usize;
2206        Ok(Box::new(FixedFullZipDecodeTask {
2207            details: self.details.clone(),
2208            data: task_data,
2209            bytes_per_value: self.bytes_per_value,
2210            num_items,
2211        }))
2212    }
2213
2214    fn num_rows(&self) -> u64 {
2215        self.num_rows
2216    }
2217}
2218
2219/// A decoder for full-zip encoded data when the data has a variable-width
2220///
2221/// Here we need to unzip the control words AND lengths from the values and
2222/// then decompress the requested values.
2223#[derive(Debug)]
2224struct VariableFullZipDecoder {
2225    details: Arc<FullZipDecodeDetails>,
2226    decompressor: Arc<dyn VariablePerValueDecompressor>,
2227    data: LanceBuffer,
2228    offsets: LanceBuffer,
2229    rep: ScalarBuffer<u16>,
2230    def: ScalarBuffer<u16>,
2231    repdef_starts: Vec<usize>,
2232    data_starts: Vec<usize>,
2233    offset_starts: Vec<usize>,
2234    visible_item_counts: Vec<u64>,
2235    bits_per_offset: u8,
2236    current_idx: usize,
2237    num_rows: u64,
2238}
2239
2240impl VariableFullZipDecoder {
2241    fn new(
2242        details: Arc<FullZipDecodeDetails>,
2243        data: VecDeque<LanceBuffer>,
2244        num_rows: u64,
2245        in_bits_per_length: u8,
2246        out_bits_per_offset: u8,
2247    ) -> Self {
2248        let decompressor = match details.value_decompressor {
2249            PerValueDecompressor::Variable(ref d) => d.clone(),
2250            _ => unreachable!(),
2251        };
2252
2253        assert_eq!(in_bits_per_length % 8, 0);
2254        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2255
2256        let mut decoder = Self {
2257            details,
2258            decompressor,
2259            data: LanceBuffer::empty(),
2260            offsets: LanceBuffer::empty(),
2261            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2262            def: LanceBuffer::empty().borrow_to_typed_slice(),
2263            bits_per_offset: out_bits_per_offset,
2264            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2265            data_starts: Vec::with_capacity(num_rows as usize + 1),
2266            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2267            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2268            current_idx: 0,
2269            num_rows,
2270        };
2271
2272        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2273        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2274        // unpacked to know where the values start and end.
2275        //
2276        // We don't want to unzip on the decode thread because that is a single-threaded path
2277        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2278        //
2279        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2280        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2281        // we load all the data into memory and then throw it away only to load it all into memory again during
2282        // the decode.
2283        //
2284        // There are some alternatives to investigate:
2285        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2286        //     range in between.  This will give us the break points that we need for slicing and won't increase
2287        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2288        //     even when doing a full scan.
2289        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2290        //     has to do more work but the work is all fused.
2291        //   - We could just try doing this work on the decode thread and see if it is a problem.
2292        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2293
2294        decoder
2295    }
2296
2297    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2298        match bits_per_offset {
2299            8 => *data.get_unchecked(0) as u64,
2300            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2301            32 => u32::from_le_bytes([
2302                *data.get_unchecked(0),
2303                *data.get_unchecked(1),
2304                *data.get_unchecked(2),
2305                *data.get_unchecked(3),
2306            ]) as u64,
2307            64 => u64::from_le_bytes([
2308                *data.get_unchecked(0),
2309                *data.get_unchecked(1),
2310                *data.get_unchecked(2),
2311                *data.get_unchecked(3),
2312                *data.get_unchecked(4),
2313                *data.get_unchecked(5),
2314                *data.get_unchecked(6),
2315                *data.get_unchecked(7),
2316            ]),
2317            _ => unreachable!(),
2318        }
2319    }
2320
2321    fn unzip(
2322        &mut self,
2323        data: VecDeque<LanceBuffer>,
2324        in_bits_per_length: u8,
2325        out_bits_per_offset: u8,
2326        num_rows: u64,
2327    ) {
2328        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2329        let mut rep = Vec::with_capacity(num_rows as usize);
2330        let mut def = Vec::with_capacity(num_rows as usize);
2331        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2332
2333        // This undercounts if there are lists
2334        // It can also overcount if there are invisible items
2335        let bytes_per_offset = out_bits_per_offset as usize / 8;
2336        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2337        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2338
2339        let bytes_per_length = in_bits_per_length as usize / 8;
2340        let bytes_lengths = bytes_per_length * num_rows as usize;
2341
2342        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2343        // This overcounts since bytes_lengths and bytes_cw are undercounts
2344        // It can also undercount if there are invisible items (hence the saturating_sub)
2345        let mut unzipped_data =
2346            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2347
2348        let mut current_offset = 0_u64;
2349        let mut visible_item_count = 0_u64;
2350        for databuf in data.into_iter() {
2351            let mut databuf = databuf.as_ref();
2352            while !databuf.is_empty() {
2353                let data_start = unzipped_data.len();
2354                let offset_start = offsets_data.len();
2355                // We might have only-rep or only-def, neither, or both.  They move at the same
2356                // speed though so we only need one index into it
2357                let repdef_start = rep.len().max(def.len());
2358                // TODO: Kind of inefficient we parse the control word twice here
2359                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2360                    databuf,
2361                    self.details.max_rep,
2362                    self.details.max_visible_def,
2363                );
2364                self.details
2365                    .ctrl_word_parser
2366                    .parse(databuf, &mut rep, &mut def);
2367                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2368
2369                if ctrl_desc.is_new_row {
2370                    self.repdef_starts.push(repdef_start);
2371                    self.data_starts.push(data_start);
2372                    self.offset_starts.push(offset_start);
2373                    self.visible_item_counts.push(visible_item_count);
2374                }
2375                if ctrl_desc.is_visible {
2376                    visible_item_count += 1;
2377                    if ctrl_desc.is_valid_item {
2378                        // Safety: Data should have at least bytes_per_length bytes remaining
2379                        debug_assert!(databuf.len() >= bytes_per_length);
2380                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2381                        match out_bits_per_offset {
2382                            32 => offsets_data
2383                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2384                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2385                            _ => unreachable!(),
2386                        };
2387                        databuf = &databuf[bytes_per_offset..];
2388                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2389                        databuf = &databuf[length as usize..];
2390                        current_offset += length;
2391                    } else {
2392                        // Null items still get an offset
2393                        match out_bits_per_offset {
2394                            32 => offsets_data
2395                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2396                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2397                            _ => unreachable!(),
2398                        }
2399                    }
2400                }
2401            }
2402        }
2403        self.repdef_starts.push(rep.len().max(def.len()));
2404        self.data_starts.push(unzipped_data.len());
2405        self.offset_starts.push(offsets_data.len());
2406        self.visible_item_counts.push(visible_item_count);
2407        match out_bits_per_offset {
2408            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2409            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2410            _ => unreachable!(),
2411        };
2412        self.rep = ScalarBuffer::from(rep);
2413        self.def = ScalarBuffer::from(def);
2414        self.data = LanceBuffer::Owned(unzipped_data);
2415        self.offsets = LanceBuffer::Owned(offsets_data);
2416    }
2417}
2418
2419impl StructuralPageDecoder for VariableFullZipDecoder {
2420    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2421        let start = self.current_idx;
2422        let end = start + num_rows as usize;
2423
2424        // This might seem a little peculiar.  We are returning the entire data for every single
2425        // batch.  This is because the offsets are relative to the start of the data.  In other words
2426        // imagine we have a data buffer that is 100 bytes long and the offsets are [0, 10, 20, 30, 40]
2427        // and we return in batches of two.  The second set of offsets will be [20, 30, 40].
2428        //
2429        // So either we pay for a copy to normalize the offsets or we just return the entire data buffer
2430        // which is slightly cheaper.
2431        let data = self.data.borrow_and_clone();
2432
2433        let offset_start = self.offset_starts[start];
2434        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2435        let offsets = self
2436            .offsets
2437            .slice_with_length(offset_start, offset_end - offset_start);
2438
2439        let repdef_start = self.repdef_starts[start];
2440        let repdef_end = self.repdef_starts[end];
2441        let rep = if self.rep.is_empty() {
2442            self.rep.clone()
2443        } else {
2444            self.rep.slice(repdef_start, repdef_end - repdef_start)
2445        };
2446        let def = if self.def.is_empty() {
2447            self.def.clone()
2448        } else {
2449            self.def.slice(repdef_start, repdef_end - repdef_start)
2450        };
2451
2452        let visible_item_counts_start = self.visible_item_counts[start];
2453        let visible_item_counts_end = self.visible_item_counts[end];
2454        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2455
2456        self.current_idx += num_rows as usize;
2457
2458        Ok(Box::new(VariableFullZipDecodeTask {
2459            details: self.details.clone(),
2460            decompressor: self.decompressor.clone(),
2461            data,
2462            offsets,
2463            bits_per_offset: self.bits_per_offset,
2464            num_visible_items,
2465            rep,
2466            def,
2467        }))
2468    }
2469
2470    fn num_rows(&self) -> u64 {
2471        self.num_rows
2472    }
2473}
2474
2475#[derive(Debug)]
2476struct VariableFullZipDecodeTask {
2477    details: Arc<FullZipDecodeDetails>,
2478    decompressor: Arc<dyn VariablePerValueDecompressor>,
2479    data: LanceBuffer,
2480    offsets: LanceBuffer,
2481    bits_per_offset: u8,
2482    num_visible_items: u64,
2483    rep: ScalarBuffer<u16>,
2484    def: ScalarBuffer<u16>,
2485}
2486
2487impl DecodePageTask for VariableFullZipDecodeTask {
2488    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2489        let block = VariableWidthBlock {
2490            data: self.data,
2491            offsets: self.offsets,
2492            bits_per_offset: self.bits_per_offset,
2493            num_values: self.num_visible_items,
2494            block_info: BlockInfo::new(),
2495        };
2496        let decomopressed = self.decompressor.decompress(block)?;
2497        let rep = self.rep.to_vec();
2498        let def = self.def.to_vec();
2499        let unraveler =
2500            RepDefUnraveler::new(Some(rep), Some(def), self.details.def_meaning.clone());
2501        Ok(DecodedPage {
2502            data: decomopressed,
2503            repdef: unraveler,
2504        })
2505    }
2506}
2507
2508#[derive(Debug)]
2509struct FullZipDecodeTaskItem {
2510    data: PerValueDataBlock,
2511    rows_in_buf: u64,
2512    items_in_buf: u64,
2513}
2514
2515/// A task to unzip and decompress full-zip encoded data when that data
2516/// has a fixed-width.
2517#[derive(Debug)]
2518struct FixedFullZipDecodeTask {
2519    details: Arc<FullZipDecodeDetails>,
2520    data: Vec<FullZipDecodeTaskItem>,
2521    num_items: usize,
2522    bytes_per_value: usize,
2523}
2524
2525impl DecodePageTask for FixedFullZipDecodeTask {
2526    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2527        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
2528        let estimated_size_bytes = self
2529            .data
2530            .iter()
2531            .map(|task_item| task_item.data.data_size() as usize)
2532            .sum::<usize>()
2533            * 2;
2534        let mut data_builder =
2535            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2536
2537        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2538            // Fast path, no need to unzip because there is no rep/def
2539            //
2540            // We decompress each buffer and add it to our output buffer
2541            for task_item in self.data.into_iter() {
2542                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2543                    unreachable!()
2544                };
2545                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2546                else {
2547                    unreachable!()
2548                };
2549                debug_assert_eq!(fixed_data.num_values, task_item.items_in_buf);
2550                let decompressed = decompressor.decompress(fixed_data)?;
2551                data_builder.append(&decompressed, 0..task_item.items_in_buf);
2552            }
2553
2554            let unraveler = RepDefUnraveler::new(None, None, self.details.def_meaning.clone());
2555
2556            Ok(DecodedPage {
2557                data: data_builder.finish(),
2558                repdef: unraveler,
2559            })
2560        } else {
2561            // Slow path, unzipping needed
2562            let mut rep = Vec::with_capacity(self.num_items);
2563            let mut def = Vec::with_capacity(self.num_items);
2564
2565            for task_item in self.data.into_iter() {
2566                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2567                    unreachable!()
2568                };
2569                let mut buf_slice = fixed_data.data.as_ref();
2570                // We will be unzipping repdef in to `rep` and `def` and the
2571                // values into `values` (which contains the compressed values)
2572                let mut values = Vec::with_capacity(
2573                    fixed_data.data.len()
2574                        - (self.details.ctrl_word_parser.bytes_per_word()
2575                            * task_item.items_in_buf as usize),
2576                );
2577                let mut visible_items = 0;
2578                for _ in 0..task_item.items_in_buf {
2579                    // Extract rep/def
2580                    self.details
2581                        .ctrl_word_parser
2582                        .parse(buf_slice, &mut rep, &mut def);
2583                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2584
2585                    let is_visible = def
2586                        .last()
2587                        .map(|d| *d <= self.details.max_visible_def)
2588                        .unwrap_or(true);
2589                    if is_visible {
2590                        // Extract value
2591                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2592                        buf_slice = &buf_slice[self.bytes_per_value..];
2593                        visible_items += 1;
2594                    }
2595                }
2596
2597                // Finally, we decompress the values and add them to our output buffer
2598                let values_buf = LanceBuffer::Owned(values);
2599                let fixed_data = FixedWidthDataBlock {
2600                    bits_per_value: self.bytes_per_value as u64 * 8,
2601                    block_info: BlockInfo::new(),
2602                    data: values_buf,
2603                    num_values: visible_items,
2604                };
2605                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2606                else {
2607                    unreachable!()
2608                };
2609                let decompressed = decompressor.decompress(fixed_data)?;
2610                data_builder.append(&decompressed, 0..visible_items);
2611            }
2612
2613            let repetition = if rep.is_empty() { None } else { Some(rep) };
2614            let definition = if def.is_empty() { None } else { Some(def) };
2615
2616            let unraveler =
2617                RepDefUnraveler::new(repetition, definition, self.details.def_meaning.clone());
2618            let data = data_builder.finish();
2619
2620            Ok(DecodedPage {
2621                data,
2622                repdef: unraveler,
2623            })
2624        }
2625    }
2626}
2627
2628#[derive(Debug)]
2629struct StructuralPrimitiveFieldSchedulingJob<'a> {
2630    scheduler: &'a StructuralPrimitiveFieldScheduler,
2631    ranges: Vec<Range<u64>>,
2632    page_idx: usize,
2633    range_idx: usize,
2634    range_offset: u64,
2635    global_row_offset: u64,
2636}
2637
2638impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2639    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2640        Self {
2641            scheduler,
2642            ranges,
2643            page_idx: 0,
2644            range_idx: 0,
2645            range_offset: 0,
2646            global_row_offset: 0,
2647        }
2648    }
2649}
2650
2651impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2652    fn schedule_next(
2653        &mut self,
2654        context: &mut SchedulerContext,
2655    ) -> Result<Option<ScheduledScanLine>> {
2656        if self.range_idx >= self.ranges.len() {
2657            return Ok(None);
2658        }
2659        // Get our current range
2660        let mut range = self.ranges[self.range_idx].clone();
2661        range.start += self.range_offset;
2662        let priority = range.start;
2663
2664        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2665        trace!(
2666            "Current range is {:?} and current page has {} rows",
2667            range,
2668            cur_page.num_rows
2669        );
2670        // Skip entire pages until we have some overlap with our next range
2671        while cur_page.num_rows + self.global_row_offset <= range.start {
2672            self.global_row_offset += cur_page.num_rows;
2673            self.page_idx += 1;
2674            trace!("Skipping entire page of {} rows", cur_page.num_rows);
2675            cur_page = &self.scheduler.page_schedulers[self.page_idx];
2676        }
2677
2678        // Now the cur_page has overlap with range.  Continue looping through ranges
2679        // until we find a range that exceeds the current page
2680
2681        let mut ranges_in_page = Vec::new();
2682        while cur_page.num_rows + self.global_row_offset > range.start {
2683            range.start = range.start.max(self.global_row_offset);
2684            let start_in_page = range.start - self.global_row_offset;
2685            let end_in_page = start_in_page + (range.end - range.start);
2686            let end_in_page = end_in_page.min(cur_page.num_rows);
2687            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2688
2689            ranges_in_page.push(start_in_page..end_in_page);
2690            if last_in_range {
2691                self.range_idx += 1;
2692                if self.range_idx == self.ranges.len() {
2693                    break;
2694                }
2695                range = self.ranges[self.range_idx].clone();
2696            } else {
2697                break;
2698            }
2699        }
2700
2701        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
2702        trace!(
2703            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2704            num_rows_in_next,
2705            ranges_in_page.len(),
2706            cur_page.num_rows,
2707            priority,
2708            self.scheduler.column_index,
2709            cur_page.page_index,
2710        );
2711
2712        self.global_row_offset += cur_page.num_rows;
2713        self.page_idx += 1;
2714
2715        let page_decoder = cur_page
2716            .scheduler
2717            .schedule_ranges(&ranges_in_page, context.io())?;
2718
2719        let cur_path = context.current_path();
2720        let page_index = cur_page.page_index;
2721        let unloaded_page = async move {
2722            let page_decoder = page_decoder.await?;
2723            Ok(LoadedPage {
2724                decoder: page_decoder,
2725                path: cur_path,
2726                page_index,
2727            })
2728        }
2729        .boxed();
2730
2731        Ok(Some(ScheduledScanLine {
2732            decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
2733            rows_scheduled: num_rows_in_next,
2734        }))
2735    }
2736}
2737
2738#[derive(Debug)]
2739struct PageInfoAndScheduler {
2740    page_index: usize,
2741    num_rows: u64,
2742    scheduler: Box<dyn StructuralPageScheduler>,
2743}
2744
2745/// A scheduler for a leaf node
2746///
2747/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
2748/// appropriate for the layout of the page.
2749#[derive(Debug)]
2750pub struct StructuralPrimitiveFieldScheduler {
2751    page_schedulers: Vec<PageInfoAndScheduler>,
2752    column_index: u32,
2753}
2754
2755impl StructuralPrimitiveFieldScheduler {
2756    pub fn try_new(
2757        column_info: &ColumnInfo,
2758        items_per_row: u64,
2759        decompressors: &dyn DecompressorStrategy,
2760    ) -> Result<Self> {
2761        let page_schedulers = column_info
2762            .page_infos
2763            .iter()
2764            .enumerate()
2765            .map(|(page_index, page_info)| {
2766                Self::page_info_to_scheduler(
2767                    page_info,
2768                    page_index,
2769                    column_info.index as usize,
2770                    decompressors,
2771                    items_per_row,
2772                )
2773            })
2774            .collect::<Result<Vec<_>>>()?;
2775        Ok(Self {
2776            page_schedulers,
2777            column_index: column_info.index,
2778        })
2779    }
2780
2781    fn page_info_to_scheduler(
2782        page_info: &PageInfo,
2783        page_index: usize,
2784        _column_index: usize,
2785        decompressors: &dyn DecompressorStrategy,
2786        items_per_row: u64,
2787    ) -> Result<PageInfoAndScheduler> {
2788        let scheduler: Box<dyn StructuralPageScheduler> =
2789            match page_info.encoding.as_structural().layout.as_ref() {
2790                Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
2791                    Box::new(MiniBlockScheduler::try_new(
2792                        &page_info.buffer_offsets_and_sizes,
2793                        page_info.priority,
2794                        mini_block.num_items,
2795                        items_per_row,
2796                        mini_block,
2797                        decompressors,
2798                    )?)
2799                }
2800                Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
2801                    Box::new(FullZipScheduler::try_new(
2802                        &page_info.buffer_offsets_and_sizes,
2803                        page_info.priority,
2804                        page_info.num_rows,
2805                        items_per_row,
2806                        full_zip,
2807                        decompressors,
2808                        /*bits_per_offset=*/ 32,
2809                    )?)
2810                }
2811                Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
2812                    let def_meaning = all_null
2813                        .layers
2814                        .iter()
2815                        .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
2816                        .collect::<Vec<_>>();
2817                    if def_meaning.len() == 1
2818                        && def_meaning[0] == DefinitionInterpretation::NullableItem
2819                    {
2820                        Box::new(SimpleAllNullScheduler::default())
2821                            as Box<dyn StructuralPageScheduler>
2822                    } else {
2823                        Box::new(ComplexAllNullScheduler::new(
2824                            page_info.buffer_offsets_and_sizes.clone(),
2825                            def_meaning.into(),
2826                            items_per_row,
2827                        )) as Box<dyn StructuralPageScheduler>
2828                    }
2829                }
2830                _ => todo!(),
2831            };
2832        Ok(PageInfoAndScheduler {
2833            page_index,
2834            num_rows: page_info.num_rows,
2835            scheduler,
2836        })
2837    }
2838}
2839
2840pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
2841    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
2842}
2843
2844pub struct NoCachedPageData;
2845
2846impl DeepSizeOf for NoCachedPageData {
2847    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
2848        0
2849    }
2850}
2851impl CachedPageData for NoCachedPageData {
2852    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2853        self
2854    }
2855}
2856
2857pub struct CachedFieldData {
2858    pages: Vec<Arc<dyn CachedPageData>>,
2859}
2860
2861impl DeepSizeOf for CachedFieldData {
2862    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
2863        self.pages.deep_size_of_children(ctx)
2864    }
2865}
2866
2867impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
2868    fn initialize<'a>(
2869        &'a mut self,
2870        _filter: &'a FilterExpression,
2871        context: &'a SchedulerContext,
2872    ) -> BoxFuture<'a, Result<()>> {
2873        let cache_key = self.column_index.to_string();
2874        if let Some(cached_data) = context.cache().get_by_str::<CachedFieldData>(&cache_key) {
2875            self.page_schedulers
2876                .iter_mut()
2877                .zip(cached_data.pages.iter())
2878                .for_each(|(page_scheduler, cached_data)| {
2879                    page_scheduler.scheduler.load(cached_data);
2880                });
2881            return std::future::ready(Ok(())).boxed();
2882        };
2883
2884        let cache = context.cache().clone();
2885        let page_data = self
2886            .page_schedulers
2887            .iter_mut()
2888            .map(|s| s.scheduler.initialize(context.io()))
2889            .collect::<FuturesUnordered<_>>();
2890
2891        async move {
2892            let page_data = page_data.try_collect::<Vec<_>>().await?;
2893            let cached_data = Arc::new(CachedFieldData { pages: page_data });
2894            cache.insert_by_str::<CachedFieldData>(&cache_key, cached_data);
2895            Ok(())
2896        }
2897        .boxed()
2898    }
2899
2900    fn schedule_ranges<'a>(
2901        &'a self,
2902        ranges: &[Range<u64>],
2903        _filter: &FilterExpression,
2904    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
2905        let ranges = ranges.to_vec();
2906        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
2907            self, ranges,
2908        )))
2909    }
2910}
2911
2912pub struct PrimitiveFieldDecoder {
2913    data_type: DataType,
2914    unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
2915    physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
2916    should_validate: bool,
2917    num_rows: u64,
2918    rows_drained: u64,
2919    column_index: u32,
2920    page_index: u32,
2921}
2922
2923impl PrimitiveFieldDecoder {
2924    pub fn new_from_data(
2925        physical_decoder: Arc<dyn PrimitivePageDecoder>,
2926        data_type: DataType,
2927        num_rows: u64,
2928        should_validate: bool,
2929    ) -> Self {
2930        Self {
2931            data_type,
2932            unloaded_physical_decoder: None,
2933            physical_decoder: Some(physical_decoder),
2934            should_validate,
2935            num_rows,
2936            rows_drained: 0,
2937            column_index: u32::MAX,
2938            page_index: u32::MAX,
2939        }
2940    }
2941}
2942
2943impl Debug for PrimitiveFieldDecoder {
2944    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2945        f.debug_struct("PrimitiveFieldDecoder")
2946            .field("data_type", &self.data_type)
2947            .field("num_rows", &self.num_rows)
2948            .field("rows_drained", &self.rows_drained)
2949            .finish()
2950    }
2951}
2952
2953struct PrimitiveFieldDecodeTask {
2954    rows_to_skip: u64,
2955    rows_to_take: u64,
2956    should_validate: bool,
2957    physical_decoder: Arc<dyn PrimitivePageDecoder>,
2958    data_type: DataType,
2959}
2960
2961impl DecodeArrayTask for PrimitiveFieldDecodeTask {
2962    fn decode(self: Box<Self>) -> Result<ArrayRef> {
2963        let block = self
2964            .physical_decoder
2965            .decode(self.rows_to_skip, self.rows_to_take)?;
2966
2967        let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
2968
2969        // This is a bit of a hack to work around https://github.com/apache/arrow-rs/issues/6302
2970        //
2971        // We change from nulls-in-dictionary (storage format) to nulls-in-indices (arrow-rs preferred
2972        // format)
2973        //
2974        // The calculation of logical_nulls is not free and would be good to avoid in the future
2975        if let DataType::Dictionary(_, _) = self.data_type {
2976            let dict = array.as_any_dictionary();
2977            if let Some(nulls) = array.logical_nulls() {
2978                let new_indices = dict.keys().to_data();
2979                let new_array = make_array(
2980                    new_indices
2981                        .into_builder()
2982                        .nulls(Some(nulls))
2983                        .add_child_data(dict.values().to_data())
2984                        .data_type(dict.data_type().clone())
2985                        .build()?,
2986                );
2987                return Ok(new_array);
2988            }
2989        }
2990        Ok(array)
2991    }
2992}
2993
2994impl LogicalPageDecoder for PrimitiveFieldDecoder {
2995    // TODO: In the future, at some point, we may consider partially waiting for primitive pages by
2996    // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode"
2997    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
2998        log::trace!(
2999            "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
3000            loaded_need,
3001            self.column_index,
3002            self.page_index,
3003            self.num_rows
3004        );
3005        async move {
3006            let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
3007            self.physical_decoder = Some(Arc::from(physical_decoder));
3008            Ok(())
3009        }
3010        .boxed()
3011    }
3012
3013    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
3014        if self.physical_decoder.as_ref().is_none() {
3015            return Err(lance_core::Error::Internal {
3016                message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
3017                location: location!(),
3018            });
3019        }
3020
3021        let rows_to_skip = self.rows_drained;
3022        let rows_to_take = num_rows;
3023
3024        self.rows_drained += rows_to_take;
3025
3026        let task = Box::new(PrimitiveFieldDecodeTask {
3027            rows_to_skip,
3028            rows_to_take,
3029            should_validate: self.should_validate,
3030            physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
3031            data_type: self.data_type.clone(),
3032        });
3033
3034        Ok(NextDecodeTask {
3035            task,
3036            num_rows: rows_to_take,
3037        })
3038    }
3039
3040    fn rows_loaded(&self) -> u64 {
3041        if self.unloaded_physical_decoder.is_some() {
3042            0
3043        } else {
3044            self.num_rows
3045        }
3046    }
3047
3048    fn rows_drained(&self) -> u64 {
3049        if self.unloaded_physical_decoder.is_some() {
3050            0
3051        } else {
3052            self.rows_drained
3053        }
3054    }
3055
3056    fn num_rows(&self) -> u64 {
3057        self.num_rows
3058    }
3059
3060    fn data_type(&self) -> &DataType {
3061        &self.data_type
3062    }
3063}
3064
3065/// Takes the output from several pages decoders and
3066/// concatenates them.
3067#[derive(Debug)]
3068pub struct StructuralCompositeDecodeArrayTask {
3069    tasks: Vec<Box<dyn DecodePageTask>>,
3070    items_type: DataType,
3071    fsl_fields: Arc<[Arc<ArrowField>]>,
3072    should_validate: bool,
3073}
3074
3075impl StructuralCompositeDecodeArrayTask {
3076    fn restore_validity(
3077        array: Arc<dyn Array>,
3078        unraveler: &mut CompositeRepDefUnraveler,
3079    ) -> Arc<dyn Array> {
3080        let validity = unraveler.unravel_validity(array.len());
3081        let Some(validity) = validity else {
3082            return array;
3083        };
3084        if array.data_type() == &DataType::Null {
3085            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
3086            return array;
3087        }
3088        assert_eq!(validity.len(), array.len());
3089        // SAFETY: We've should have already asserted the buffers are all valid, we are just
3090        // adding null buffers to the array here
3091        make_array(unsafe {
3092            array
3093                .to_data()
3094                .into_builder()
3095                .nulls(Some(validity))
3096                .build_unchecked()
3097        })
3098    }
3099
3100    fn restore_fsl(
3101        array: Arc<dyn Array>,
3102        unraveler: &mut CompositeRepDefUnraveler,
3103        fsl_fields: Arc<[Arc<ArrowField>]>,
3104    ) -> Arc<dyn Array> {
3105        let mut array = array;
3106        for fsl_field in fsl_fields.iter().rev() {
3107            let DataType::FixedSizeList(child_field, dimension) = fsl_field.data_type() else {
3108                unreachable!()
3109            };
3110            let fsl_num_values = array.len() / *dimension as usize;
3111            let fsl_validity = unraveler.unravel_fsl_validity(fsl_num_values, *dimension as usize);
3112            array = Arc::new(FixedSizeListArray::new(
3113                child_field.clone(),
3114                *dimension,
3115                array,
3116                fsl_validity,
3117            ));
3118        }
3119        array
3120    }
3121}
3122
3123impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3124    fn decode(self: Box<Self>) -> Result<DecodedArray> {
3125        let mut arrays = Vec::with_capacity(self.tasks.len());
3126        let mut unravelers = Vec::with_capacity(self.tasks.len());
3127        for task in self.tasks {
3128            let decoded = task.decode()?;
3129            unravelers.push(decoded.repdef);
3130
3131            let array = make_array(
3132                decoded
3133                    .data
3134                    .into_arrow(self.items_type.clone(), self.should_validate)?,
3135            );
3136
3137            arrays.push(array);
3138        }
3139        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3140        let array = arrow_select::concat::concat(&array_refs)?;
3141        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3142
3143        let array = Self::restore_validity(array, &mut repdef);
3144        let array = Self::restore_fsl(array, &mut repdef, self.fsl_fields);
3145
3146        Ok(DecodedArray { array, repdef })
3147    }
3148}
3149
3150#[derive(Debug)]
3151pub struct StructuralPrimitiveFieldDecoder {
3152    field: Arc<ArrowField>,
3153    items_type: DataType,
3154    fsl_fields: Arc<[Arc<ArrowField>]>,
3155    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3156    should_validate: bool,
3157    rows_drained_in_current: u64,
3158}
3159
3160impl StructuralPrimitiveFieldDecoder {
3161    fn flatten_field_helper(
3162        field: &Arc<ArrowField>,
3163        mut fields: Vec<Arc<ArrowField>>,
3164    ) -> (Arc<[Arc<ArrowField>]>, &DataType) {
3165        match field.data_type() {
3166            DataType::FixedSizeList(inner, _) => {
3167                fields.push(field.clone());
3168                Self::flatten_field_helper(inner, fields)
3169            }
3170            _ => {
3171                let fields = fields.into();
3172                (fields, field.data_type())
3173            }
3174        }
3175    }
3176
3177    fn flatten_field(field: &Arc<ArrowField>) -> (Arc<[Arc<ArrowField>]>, &DataType) {
3178        Self::flatten_field_helper(field, Vec::default())
3179    }
3180
3181    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3182        let (fsl_fields, items_type) = Self::flatten_field(field);
3183        Self {
3184            field: field.clone(),
3185            items_type: items_type.clone(),
3186            fsl_fields,
3187            page_decoders: VecDeque::new(),
3188            should_validate,
3189            rows_drained_in_current: 0,
3190        }
3191    }
3192}
3193
3194impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3195    fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
3196        assert!(child.path.is_empty());
3197        self.page_decoders.push_back(child.decoder);
3198        Ok(())
3199    }
3200
3201    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3202        let mut remaining = num_rows;
3203        let mut tasks = Vec::new();
3204        while remaining > 0 {
3205            let cur_page = self.page_decoders.front_mut().unwrap();
3206            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3207            let to_take = num_in_page.min(remaining);
3208
3209            let task = cur_page.drain(to_take)?;
3210            tasks.push(task);
3211
3212            if to_take == num_in_page {
3213                self.page_decoders.pop_front();
3214                self.rows_drained_in_current = 0;
3215            } else {
3216                self.rows_drained_in_current += to_take;
3217            }
3218
3219            remaining -= to_take;
3220        }
3221        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3222            tasks,
3223            items_type: self.items_type.clone(),
3224            should_validate: self.should_validate,
3225            fsl_fields: self.fsl_fields.clone(),
3226        }))
3227    }
3228
3229    fn data_type(&self) -> &DataType {
3230        self.field.data_type()
3231    }
3232}
3233
3234#[derive(Debug)]
3235pub struct AccumulationQueue {
3236    cache_bytes: u64,
3237    keep_original_array: bool,
3238    buffered_arrays: Vec<ArrayRef>,
3239    current_bytes: u64,
3240    // Row number of the first item in buffered_arrays, reset on flush
3241    row_number: u64,
3242    // Number of top level rows represented in buffered_arrays, reset on flush
3243    num_rows: u64,
3244    // This is only for logging / debugging purposes
3245    column_index: u32,
3246}
3247
3248impl AccumulationQueue {
3249    pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
3250        Self {
3251            cache_bytes,
3252            buffered_arrays: Vec::new(),
3253            current_bytes: 0,
3254            column_index,
3255            keep_original_array,
3256            row_number: u64::MAX,
3257            num_rows: 0,
3258        }
3259    }
3260
3261    /// Adds an array to the queue, if there is enough data then the queue is flushed
3262    /// and returned
3263    pub fn insert(
3264        &mut self,
3265        array: ArrayRef,
3266        row_number: u64,
3267        num_rows: u64,
3268    ) -> Option<(Vec<ArrayRef>, u64, u64)> {
3269        if self.row_number == u64::MAX {
3270            self.row_number = row_number;
3271        }
3272        self.num_rows += num_rows;
3273        self.current_bytes += array.get_array_memory_size() as u64;
3274        if self.current_bytes > self.cache_bytes {
3275            debug!(
3276                "Flushing column {} page of size {} bytes (unencoded)",
3277                self.column_index, self.current_bytes
3278            );
3279            // Push into buffered_arrays without copy since we are about to flush anyways
3280            self.buffered_arrays.push(array);
3281            self.current_bytes = 0;
3282            let row_number = self.row_number;
3283            self.row_number = u64::MAX;
3284            let num_rows = self.num_rows;
3285            self.num_rows = 0;
3286            Some((
3287                std::mem::take(&mut self.buffered_arrays),
3288                row_number,
3289                num_rows,
3290            ))
3291        } else {
3292            trace!(
3293                "Accumulating data for column {}.  Now at {} bytes",
3294                self.column_index,
3295                self.current_bytes
3296            );
3297            if self.keep_original_array {
3298                self.buffered_arrays.push(array);
3299            } else {
3300                self.buffered_arrays.push(deep_copy_array(array.as_ref()))
3301            }
3302            None
3303        }
3304    }
3305
3306    pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
3307        if self.buffered_arrays.is_empty() {
3308            trace!(
3309                "No final flush since no data at column {}",
3310                self.column_index
3311            );
3312            None
3313        } else {
3314            trace!(
3315                "Final flush of column {} which has {} bytes",
3316                self.column_index,
3317                self.current_bytes
3318            );
3319            self.current_bytes = 0;
3320            let row_number = self.row_number;
3321            self.row_number = u64::MAX;
3322            let num_rows = self.num_rows;
3323            self.num_rows = 0;
3324            Some((
3325                std::mem::take(&mut self.buffered_arrays),
3326                row_number,
3327                num_rows,
3328            ))
3329        }
3330    }
3331}
3332
3333pub struct PrimitiveFieldEncoder {
3334    accumulation_queue: AccumulationQueue,
3335    array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
3336    column_index: u32,
3337    field: Field,
3338    max_page_bytes: u64,
3339}
3340
3341impl PrimitiveFieldEncoder {
3342    pub fn try_new(
3343        options: &EncodingOptions,
3344        array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
3345        column_index: u32,
3346        field: Field,
3347    ) -> Result<Self> {
3348        Ok(Self {
3349            accumulation_queue: AccumulationQueue::new(
3350                options.cache_bytes_per_column,
3351                column_index,
3352                options.keep_original_array,
3353            ),
3354            column_index,
3355            max_page_bytes: options.max_page_bytes,
3356            array_encoding_strategy,
3357            field,
3358        })
3359    }
3360
3361    fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
3362        let encoder = self
3363            .array_encoding_strategy
3364            .create_array_encoder(&arrays, &self.field)?;
3365        let column_idx = self.column_index;
3366        let data_type = self.field.data_type();
3367
3368        Ok(tokio::task::spawn(async move {
3369            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
3370            let data = DataBlock::from_arrays(&arrays, num_values);
3371            let mut buffer_index = 0;
3372            let array = encoder.encode(data, &data_type, &mut buffer_index)?;
3373            let (data, description) = array.into_buffers();
3374            Ok(EncodedPage {
3375                data,
3376                description: PageEncoding::Legacy(description),
3377                num_rows: num_values,
3378                column_idx,
3379                row_number: 0, // legacy encoders do not use
3380            })
3381        })
3382        .map(|res_res| res_res.unwrap())
3383        .boxed())
3384    }
3385
3386    // Creates an encode task, consuming all buffered data
3387    fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
3388        if arrays.len() == 1 {
3389            let array = arrays.into_iter().next().unwrap();
3390            let size_bytes = array.get_buffer_memory_size();
3391            let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
3392            // Can't slice it finer than 1 page per row
3393            let num_parts = num_parts.min(array.len());
3394            if num_parts <= 1 {
3395                // One part and it fits in a page
3396                Ok(vec![self.create_encode_task(vec![array])?])
3397            } else {
3398                // One part and it needs to be sliced into multiple pages
3399
3400                // This isn't perfect (items in the array might not all have the same size)
3401                // but it's a reasonable stab for now)
3402                let mut tasks = Vec::with_capacity(num_parts);
3403                let mut offset = 0;
3404                let part_size = bit_util::ceil(array.len(), num_parts);
3405                for _ in 0..num_parts {
3406                    let avail = array.len() - offset;
3407                    let chunk_size = avail.min(part_size);
3408                    let part = array.slice(offset, chunk_size);
3409                    let task = self.create_encode_task(vec![part])?;
3410                    tasks.push(task);
3411                    offset += chunk_size;
3412                }
3413                Ok(tasks)
3414            }
3415        } else {
3416            // Multiple parts that (presumably) all fit in a page
3417            //
3418            // TODO: Could check here if there are any jumbo parts in the mix that need splitting
3419            Ok(vec![self.create_encode_task(arrays)?])
3420        }
3421    }
3422}
3423
3424impl FieldEncoder for PrimitiveFieldEncoder {
3425    // Buffers data, if there is enough to write a page then we create an encode task
3426    fn maybe_encode(
3427        &mut self,
3428        array: ArrayRef,
3429        _external_buffers: &mut OutOfLineBuffers,
3430        _repdef: RepDefBuilder,
3431        row_number: u64,
3432        num_rows: u64,
3433    ) -> Result<Vec<EncodeTask>> {
3434        if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
3435            Ok(self.do_flush(arrays.0)?)
3436        } else {
3437            Ok(vec![])
3438        }
3439    }
3440
3441    // If there is any data left in the buffer then create an encode task from it
3442    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
3443        if let Some(arrays) = self.accumulation_queue.flush() {
3444            Ok(self.do_flush(arrays.0)?)
3445        } else {
3446            Ok(vec![])
3447        }
3448    }
3449
3450    fn num_columns(&self) -> u32 {
3451        1
3452    }
3453
3454    fn finish(
3455        &mut self,
3456        _external_buffers: &mut OutOfLineBuffers,
3457    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
3458        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
3459    }
3460}
3461
3462/// The serialized representation of full-zip data
3463struct SerializedFullZip {
3464    /// The zipped values buffer
3465    values: LanceBuffer,
3466    /// The repetition index (only present if there is repetition)
3467    repetition_index: Option<LanceBuffer>,
3468}
3469
3470// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3471// to allow us to store a chunk size in 12 bits.
3472//
3473// If we directly record the size in bytes with 12 bits we would be limited to
3474// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3475// words we can store the # of words instead which gives us 32KiB.  We want
3476// at least 24KiB so we can handle even the worst case of
3477// - 4Ki values compressed into an 8186 byte buffer
3478// - 4 bytes to describe rep & def lengths
3479// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3480//   plan for it)
3481//
3482// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3483// values like offsets to be stored in a mini-block and safely read back out.  It also
3484// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3485// needed).
3486//
3487// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3488// the page" and "aligned to 8 bytes from the start of the file."
3489const MINIBLOCK_ALIGNMENT: usize = 8;
3490const MINIBLOCK_MAX_PADDING: usize = MINIBLOCK_ALIGNMENT - 1;
3491
3492/// An encoder for primitive (leaf) arrays
3493///
3494/// This encoder is fairly complicated and follows a number of paths depending
3495/// on the data.
3496///
3497/// First, we convert the validity & offsets information into repetition and
3498/// definition levels.  Then we compress the data itself into a single buffer.
3499///
3500/// If the data is narrow then we encode the data in small chunks (each chunk
3501/// should be a few disk sectors and contains a buffer of repetition, a buffer
3502/// of definition, and a buffer of value data).  This approach is called
3503/// "mini-block".  These mini-blocks are stored into a single data buffer.
3504///
3505/// If the data is wide then we zip together the repetition and definition value
3506/// with the value data into a single buffer.  This approach is called "zipped".
3507///
3508/// If there is any repetition information then we create a repetition index (TODO)
3509///
3510/// In addition, the compression process may create zero or more metadata buffers.
3511/// For example, a dictionary compression will create dictionary metadata.  Any
3512/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3513/// stored in a separate buffer on disk and read at initialization time.
3514///
3515/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3516/// at (roughly) the end of the file so there is, at most, one read per column of
3517/// metadata per file.
3518pub struct PrimitiveStructuralEncoder {
3519    // Accumulates arrays until we have enough data to justify a disk page
3520    accumulation_queue: AccumulationQueue,
3521    accumulated_repdefs: Vec<RepDefBuilder>,
3522    // The compression strategy we will use to compress the data
3523    compression_strategy: Arc<dyn CompressionStrategy>,
3524    column_index: u32,
3525    field: Field,
3526    encoding_metadata: Arc<HashMap<String, String>>,
3527}
3528
3529impl PrimitiveStructuralEncoder {
3530    pub fn try_new(
3531        options: &EncodingOptions,
3532        compression_strategy: Arc<dyn CompressionStrategy>,
3533        column_index: u32,
3534        field: Field,
3535        encoding_metadata: Arc<HashMap<String, String>>,
3536    ) -> Result<Self> {
3537        Ok(Self {
3538            accumulation_queue: AccumulationQueue::new(
3539                options.cache_bytes_per_column,
3540                column_index,
3541                options.keep_original_array,
3542            ),
3543            accumulated_repdefs: Vec::new(),
3544            column_index,
3545            compression_strategy,
3546            field,
3547            encoding_metadata,
3548        })
3549    }
3550
3551    // TODO: This is a heuristic we may need to tune at some point
3552    //
3553    // As data gets narrow then the "zipping" process gets too expensive
3554    //   and we prefer mini-block
3555    // As data gets wide then the # of values per block shrinks (very wide)
3556    //   data doesn't even fit in a mini-block and the block overhead gets
3557    //   too large and we prefer zipped.
3558    fn is_narrow(data_block: &DataBlock) -> bool {
3559        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3560
3561        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3562            let max_len_array = max_len_array
3563                .as_any()
3564                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3565                .unwrap();
3566            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3567                return true;
3568            }
3569        }
3570        false
3571    }
3572
3573    fn prefers_miniblock(
3574        data_block: &DataBlock,
3575        encoding_metadata: &HashMap<String, String>,
3576    ) -> bool {
3577        // If the user specifically requested miniblock then use it
3578        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3579            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3580        }
3581        // Otherwise only use miniblock if it is narrow
3582        Self::is_narrow(data_block)
3583    }
3584
3585    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3586        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3587        // user specifically requested not to use it (in which case we're probably going
3588        // to emit an error)
3589        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3590            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3591        }
3592        true
3593    }
3594
3595    // Converts value data, repetition levels, and definition levels into a single
3596    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3597    // which tells us the size of each block.  Finally, if repetition is present then
3598    // we also create a buffer for the repetition index.
3599    //
3600    // Each chunk is serialized as:
3601    // | rep_len (2 bytes) | def_len (2 bytes) | values_len (2 bytes) | rep | P1 | def | P2 | values | P3 |
3602    //
3603    // P1 - Up to 1 padding byte to ensure `def` is 2-byte aligned
3604    // P2 - Up to 7 padding bytes to ensure `values` is 8-byte aligned
3605    // P3 - Up to 7 padding bytes to ensure the chunk is a multiple of 8 bytes (this also ensures
3606    //      that the next `chunk` is 8-byte aligned)
3607    //
3608    // rep is guaranteed to be 2-byte aligned
3609    // def is guaranteed to be 2-byte aligned
3610    // values is guaranteed to be 8-byte aligned
3611    // rep_len, def_len, and values_len are guaranteed to be 2-byte aligned but this shouldn't matter.
3612    //
3613    // Each block has a u16 word of metadata.  The upper 12 bits contain 1/6 the
3614    // # of bytes in the block (if the block does not have an even number of bytes
3615    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3616    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3617    // 0xA)  All blocks except the last must have power-of-two number of values.
3618    // This not only makes metadata smaller but it makes decoding easier since
3619    // batch sizes are typically a power of 2.  4 bits would allow us to express
3620    // up to 16Ki values but we restrict this further to 4Ki values.
3621    //
3622    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3623    //
3624    // All metadata words are serialized (as little endian) into a single buffer
3625    // of metadata values.
3626    //
3627    // If there is repetition then we also create a repetition index.  This is a
3628    // single buffer of integer vectors (stored in row major order).  There is one
3629    // entry for each chunk.  The size of the vector is based on the depth of random
3630    // access we want to support.
3631    //
3632    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3633    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3634    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3635    // levels of nested access and so on.
3636    //
3637    // The first number in the vector is the number of top-level rows that complete in
3638    // the chunk.  The second number is the number of second-level rows that complete
3639    // after the final top-level row completed (or beginning of the chunk if no top-level
3640    // row completes in the chunk).  And so on.  The final number in the vector is always
3641    // the number of leftover items not covered by earlier entries in the vector.
3642    //
3643    // Currently we are limited to 0 levels of nested access but that will change in the
3644    // future.
3645    //
3646    // The repetition index and the chunk metadata are read at initialization time and
3647    // cached in memory.
3648    fn serialize_miniblocks(
3649        miniblocks: MiniBlockCompressed,
3650        rep: Vec<LanceBuffer>,
3651        def: Vec<LanceBuffer>,
3652    ) -> (LanceBuffer, LanceBuffer) {
3653        let bytes_rep = rep.iter().map(|r| r.len()).sum::<usize>();
3654        let bytes_def = def.iter().map(|d| d.len()).sum::<usize>();
3655        let max_bytes_repdef_len = rep.len() * 4;
3656        let max_padding = miniblocks.chunks.len() * (1 + (2 * MINIBLOCK_MAX_PADDING));
3657        let mut data_buffer = Vec::with_capacity(
3658            miniblocks.data.len()      // `values`
3659                + bytes_rep            // `rep_len * num_blocks`
3660                + bytes_def            // `def_len * num_blocks`
3661                + max_bytes_repdef_len // `rep` and `def`
3662                + max_padding, // `P1`, `P2`, and `P3` for each block
3663        );
3664        let mut meta_buffer = Vec::with_capacity(miniblocks.data.len() * 2);
3665
3666        let mut value_offset = 0;
3667        for ((chunk, rep), def) in miniblocks.chunks.into_iter().zip(rep).zip(def) {
3668            let start_len = data_buffer.len();
3669            // Start of chunk should be aligned
3670            debug_assert_eq!(start_len % MINIBLOCK_ALIGNMENT, 0);
3671
3672            assert!(rep.len() < u16::MAX as usize);
3673            assert!(def.len() < u16::MAX as usize);
3674            let bytes_rep = rep.len() as u16;
3675            let bytes_def = def.len() as u16;
3676            let bytes_val = chunk.num_bytes;
3677
3678            // Each chunk starts with the size of the rep buffer (2 bytes) the size of
3679            // the def buffer (2 bytes) and the size of the values buffer (2 bytes)
3680            data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3681            data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3682            data_buffer.extend_from_slice(&bytes_val.to_le_bytes());
3683
3684            data_buffer.extend_from_slice(&rep);
3685            // In theory we should insert P1 here.  However, since we do not have bit-packing of rep
3686            // def levels yet we can skip this step.
3687            debug_assert_eq!(data_buffer.len() % 2, 0);
3688            data_buffer.extend_from_slice(&def);
3689
3690            let p2 = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3691            // SAFETY: We ensured the data buffer would be large enough when we allocated
3692            data_buffer.extend(iter::repeat(0).take(p2));
3693
3694            let num_value_bytes = chunk.num_bytes as usize;
3695            let values =
3696                &miniblocks.data[value_offset as usize..value_offset as usize + num_value_bytes];
3697            debug_assert_eq!(data_buffer.len() % MINIBLOCK_ALIGNMENT, 0);
3698            data_buffer.extend_from_slice(values);
3699
3700            let p3 = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3701            data_buffer.extend(iter::repeat(0).take(p3));
3702            value_offset += num_value_bytes as u64;
3703
3704            let chunk_bytes = data_buffer.len() - start_len;
3705            assert!(chunk_bytes <= 16 * 1024);
3706            assert!(chunk_bytes > 0);
3707            assert_eq!(chunk_bytes % 8, 0);
3708            // We subtract 1 here from chunk_bytes because we want to be able to express
3709            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
3710            // 0xFFF
3711            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3712            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3713
3714            let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3715            meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3716        }
3717
3718        (
3719            LanceBuffer::Owned(data_buffer),
3720            LanceBuffer::Owned(meta_buffer),
3721        )
3722    }
3723
3724    /// Compresses a buffer of levels into chunks
3725    ///
3726    /// TODO: Use bit-packing here
3727    ///
3728    /// If these are repetition levels then we also calculate the repetition index here (that
3729    /// is the third return value)
3730    fn compress_levels(
3731        levels: Option<RepDefSlicer<'_>>,
3732        num_values: u64,
3733        compression_strategy: &dyn CompressionStrategy,
3734        chunks: &[MiniBlockChunk],
3735        // This will be 0 if we are compressing def levels
3736        max_rep: u16,
3737    ) -> Result<(Vec<LanceBuffer>, pb::ArrayEncoding, LanceBuffer)> {
3738        if let Some(mut levels) = levels {
3739            let mut rep_index = if max_rep > 0 {
3740                Vec::with_capacity(chunks.len())
3741            } else {
3742                vec![]
3743            };
3744            // Make the levels into a FixedWidth data block
3745            let num_levels = levels.num_levels() as u64;
3746            let mut levels_buf = levels.all_levels().try_clone().unwrap();
3747            let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3748                data: levels_buf.borrow_and_clone(),
3749                bits_per_value: 16,
3750                num_values: num_levels,
3751                block_info: BlockInfo::new(),
3752            });
3753            let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3754            // Pick a block compressor
3755            let (compressor, compressor_desc) =
3756                compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3757            // Compress blocks of levels (sized according to the chunks)
3758            let mut buffers = Vec::with_capacity(chunks.len());
3759            let mut values_counter = 0;
3760            for (chunk_idx, chunk) in chunks.iter().enumerate() {
3761                let chunk_num_values = chunk.num_values(values_counter, num_values);
3762                values_counter += chunk_num_values;
3763                let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
3764                    levels.slice_next(chunk_num_values as usize)
3765                } else {
3766                    levels.slice_rest()
3767                };
3768                let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3769                if max_rep > 0 {
3770                    // If max_rep > 0 then we are working with rep levels and we need
3771                    // to calculate the repetition index.  The repetition index for a
3772                    // chunk is currently 2 values (in the future it may be more).
3773                    //
3774                    // The first value is the number of rows that _finish_ in the
3775                    // chunk.
3776                    //
3777                    // The second value is the number of "leftovers" after the last
3778                    // finished row in the chunk.
3779                    let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3780                    let rep_values = rep_values.as_ref();
3781
3782                    // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
3783                    // will count it in the previous chunk)
3784                    let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3785                    let num_leftovers = if chunk_idx < chunks.len() - 1 {
3786                        rep_values
3787                            .iter()
3788                            .rev()
3789                            .position(|v| *v == max_rep)
3790                            // # of leftovers includes the max_rep spot
3791                            .map(|pos| pos + 1)
3792                            .unwrap_or(rep_values.len())
3793                    } else {
3794                        // Last chunk can't have leftovers
3795                        0
3796                    };
3797
3798                    if chunk_idx != 0 && rep_values[0] == max_rep {
3799                        // This chunk starts with a new row and so, if we thought we had leftovers
3800                        // in the previous chunk, we were mistaken
3801                        // TODO: Can use unchecked here
3802                        let rep_len = rep_index.len();
3803                        if rep_index[rep_len - 1] != 0 {
3804                            // We thought we had leftovers but that was actually a full row
3805                            rep_index[rep_len - 2] += 1;
3806                            rep_index[rep_len - 1] = 0;
3807                        }
3808                    }
3809
3810                    if chunk_idx == chunks.len() - 1 {
3811                        // The final list
3812                        num_rows += 1;
3813                    }
3814                    rep_index.push(num_rows as u64);
3815                    rep_index.push(num_leftovers as u64);
3816                }
3817                let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3818                    data: chunk_levels,
3819                    bits_per_value: 16,
3820                    num_values: num_chunk_levels,
3821                    block_info: BlockInfo::new(),
3822                });
3823                let compressed_levels = compressor.compress(chunk_levels_block)?;
3824                buffers.push(compressed_levels);
3825            }
3826            debug_assert_eq!(levels.num_levels_remaining(), 0);
3827            let rep_index = LanceBuffer::reinterpret_vec(rep_index);
3828            Ok((buffers, compressor_desc, rep_index))
3829        } else {
3830            // Everything is valid or we have no repetition so we encode as a constant
3831            // array of 0
3832            let data = chunks.iter().map(|_| LanceBuffer::empty()).collect();
3833            let scalar = 0_u16.to_le_bytes().to_vec();
3834            let encoding = ProtobufUtils::constant(scalar, num_values);
3835            Ok((data, encoding, LanceBuffer::empty()))
3836        }
3837    }
3838
3839    fn encode_simple_all_null(
3840        column_idx: u32,
3841        num_rows: u64,
3842        row_number: u64,
3843    ) -> Result<EncodedPage> {
3844        let description = ProtobufUtils::simple_all_null_layout();
3845        Ok(EncodedPage {
3846            column_idx,
3847            data: vec![],
3848            description: PageEncoding::Structural(description),
3849            num_rows,
3850            row_number,
3851        })
3852    }
3853
3854    // Encodes a page where all values are null but we have rep/def
3855    // information that we need to store (e.g. to distinguish between
3856    // different kinds of null)
3857    fn encode_complex_all_null(
3858        column_idx: u32,
3859        repdefs: Vec<RepDefBuilder>,
3860        row_number: u64,
3861        num_rows: u64,
3862    ) -> Result<EncodedPage> {
3863        let repdef = RepDefBuilder::serialize(repdefs);
3864
3865        // TODO: Actually compress repdef
3866        let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3867            LanceBuffer::reinterpret_slice(rep.clone())
3868        } else {
3869            LanceBuffer::empty()
3870        };
3871
3872        let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3873            LanceBuffer::reinterpret_slice(def.clone())
3874        } else {
3875            LanceBuffer::empty()
3876        };
3877
3878        let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
3879        Ok(EncodedPage {
3880            column_idx,
3881            data: vec![rep_bytes, def_bytes],
3882            description: PageEncoding::Structural(description),
3883            num_rows,
3884            row_number,
3885        })
3886    }
3887
3888    #[allow(clippy::too_many_arguments)]
3889    fn encode_miniblock(
3890        column_idx: u32,
3891        field: &Field,
3892        compression_strategy: &dyn CompressionStrategy,
3893        data: DataBlock,
3894        repdefs: Vec<RepDefBuilder>,
3895        row_number: u64,
3896        dictionary_data: Option<DataBlock>,
3897        num_rows: u64,
3898    ) -> Result<EncodedPage> {
3899        let repdef = RepDefBuilder::serialize(repdefs);
3900
3901        if let DataBlock::AllNull(_null_block) = data {
3902            // If we got here then all the data is null but we have rep/def information that
3903            // we need to store.
3904            todo!()
3905        }
3906
3907        // The validity is encoded in repdef so we can remove it
3908        let data = data.remove_validity();
3909
3910        // We encode FSL by flattening the data and then compressing it.  This means the mini-block will have
3911        // more items than rows if any FSL layers are present.
3912        let data = data.flatten();
3913
3914        let num_items = data.num_values();
3915
3916        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3917        let (compressed_data, value_encoding) = compressor.compress(data)?;
3918
3919        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3920
3921        let (compressed_rep, rep_encoding, rep_index) = Self::compress_levels(
3922            repdef.rep_slicer(),
3923            num_items,
3924            compression_strategy,
3925            &compressed_data.chunks,
3926            max_rep,
3927        )?;
3928
3929        let (rep_index, rep_index_depth) = if rep_index.is_empty() {
3930            (None, 0)
3931        } else {
3932            // TODO: Support repetition index depth > 1
3933            (Some(rep_index), 1)
3934        };
3935
3936        let (compressed_def, def_encoding, _) = Self::compress_levels(
3937            repdef.def_slicer(),
3938            num_items,
3939            compression_strategy,
3940            &compressed_data.chunks,
3941            /*max_rep=*/ 0,
3942        )?;
3943
3944        // TODO: Parquet sparsely encodes values here.  We could do the same but
3945        // then we won't have log2 values per chunk.  This means more metadata
3946        // and potentially more decoder asymmetry.  However, it may be worth
3947        // investigating at some point
3948
3949        let (block_value_buffer, block_meta_buffer) =
3950            Self::serialize_miniblocks(compressed_data, compressed_rep, compressed_def);
3951
3952        // Metadata, Data, Dictionary, (maybe) Repetition Index
3953        let mut data = Vec::with_capacity(4);
3954        data.push(block_meta_buffer);
3955        data.push(block_value_buffer);
3956
3957        if let Some(dictionary_data) = dictionary_data {
3958            // field in `create_block_compressor` is not used currently.
3959            let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3960
3961            let (compressor, dictionary_encoding) = compression_strategy
3962                .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3963            let dictionary_buffer = compressor.compress(dictionary_data)?;
3964
3965            data.push(dictionary_buffer);
3966            if let Some(rep_index) = rep_index {
3967                data.push(rep_index);
3968            }
3969
3970            let description = ProtobufUtils::miniblock_layout(
3971                rep_encoding,
3972                def_encoding,
3973                value_encoding,
3974                rep_index_depth,
3975                Some(dictionary_encoding),
3976                &repdef.def_meaning,
3977                num_items,
3978            );
3979            Ok(EncodedPage {
3980                num_rows,
3981                column_idx,
3982                data,
3983                description: PageEncoding::Structural(description),
3984                row_number,
3985            })
3986        } else {
3987            let description = ProtobufUtils::miniblock_layout(
3988                rep_encoding,
3989                def_encoding,
3990                value_encoding,
3991                rep_index_depth,
3992                None,
3993                &repdef.def_meaning,
3994                num_items,
3995            );
3996
3997            if let Some(mut rep_index) = rep_index {
3998                let view = rep_index.borrow_to_typed_slice::<u64>();
3999                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4000                debug_assert_eq!(total, num_rows);
4001
4002                data.push(rep_index);
4003            }
4004
4005            Ok(EncodedPage {
4006                num_rows,
4007                column_idx,
4008                data,
4009                description: PageEncoding::Structural(description),
4010                row_number,
4011            })
4012        }
4013    }
4014
4015    // For fixed-size data we encode < control word | data > for each value
4016    fn serialize_full_zip_fixed(
4017        fixed: FixedWidthDataBlock,
4018        mut repdef: ControlWordIterator,
4019        num_items: u64,
4020    ) -> SerializedFullZip {
4021        let len = fixed.data.len() + repdef.bytes_per_word() * num_items as usize;
4022        let mut zipped_data = Vec::with_capacity(len);
4023
4024        let max_rep_index_val = if repdef.has_repetition() {
4025            len as u64
4026        } else {
4027            // Setting this to 0 means we won't write a repetition index
4028            0
4029        };
4030        let mut rep_index_builder =
4031            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4032
4033        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
4034        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
4035        assert_eq!(
4036            fixed.bits_per_value % 8,
4037            0,
4038            "Non-byte aligned full-zip compression not yet supported"
4039        );
4040
4041        let bytes_per_value = fixed.bits_per_value as usize / 8;
4042
4043        let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4044        let mut offset = 0;
4045        while let Some(control) = repdef.append_next(&mut zipped_data) {
4046            if control.is_new_row {
4047                // We have finished a row
4048                debug_assert!(offset <= len);
4049                // SAFETY: We know that `start <= len`
4050                unsafe { rep_index_builder.append(offset as u64) };
4051            }
4052            if control.is_visible {
4053                let value = data_iter.next().unwrap();
4054                zipped_data.extend_from_slice(value);
4055            }
4056            offset = zipped_data.len();
4057        }
4058
4059        debug_assert_eq!(zipped_data.len(), len);
4060        // Put the final value in the rep index
4061        // SAFETY: `zipped_data.len() == len`
4062        unsafe {
4063            rep_index_builder.append(zipped_data.len() as u64);
4064        }
4065
4066        let zipped_data = LanceBuffer::Owned(zipped_data);
4067        let rep_index = rep_index_builder.into_data();
4068        let rep_index = if rep_index.is_empty() {
4069            None
4070        } else {
4071            Some(LanceBuffer::Owned(rep_index))
4072        };
4073        SerializedFullZip {
4074            values: zipped_data,
4075            repetition_index: rep_index,
4076        }
4077    }
4078
4079    // For variable-size data we encode < control word | length | data > for each value
4080    //
4081    // In addition, we create a second buffer, the repetition index
4082    fn serialize_full_zip_variable(
4083        mut variable: VariableWidthBlock,
4084        mut repdef: ControlWordIterator,
4085        num_items: u64,
4086    ) -> SerializedFullZip {
4087        let bytes_per_offset = variable.bits_per_offset as usize / 8;
4088        assert_eq!(
4089            variable.bits_per_offset % 8,
4090            0,
4091            "Only byte-aligned offsets supported"
4092        );
4093        let len = variable.data.len()
4094            + repdef.bytes_per_word() * num_items as usize
4095            + bytes_per_offset * variable.num_values as usize;
4096        let mut buf = Vec::with_capacity(len);
4097
4098        let max_rep_index_val = len as u64;
4099        let mut rep_index_builder =
4100            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4101
4102        // TODO: byte pack the item lengths with varint encoding
4103        match bytes_per_offset {
4104            4 => {
4105                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4106                let mut rep_offset = 0;
4107                let mut windows_iter = offs.as_ref().windows(2);
4108                while let Some(control) = repdef.append_next(&mut buf) {
4109                    if control.is_new_row {
4110                        // We have finished a row
4111                        debug_assert!(rep_offset <= len);
4112                        // SAFETY: We know that `buf.len() <= len`
4113                        unsafe { rep_index_builder.append(rep_offset as u64) };
4114                    }
4115                    if control.is_visible {
4116                        let window = windows_iter.next().unwrap();
4117                        if control.is_valid_item {
4118                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4119                            buf.extend_from_slice(
4120                                &variable.data[window[0] as usize..window[1] as usize],
4121                            );
4122                        }
4123                    }
4124                    rep_offset = buf.len();
4125                }
4126            }
4127            8 => {
4128                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4129                let mut rep_offset = 0;
4130                let mut windows_iter = offs.as_ref().windows(2);
4131                while let Some(control) = repdef.append_next(&mut buf) {
4132                    if control.is_new_row {
4133                        // We have finished a row
4134                        debug_assert!(rep_offset <= len);
4135                        // SAFETY: We know that `buf.len() <= len`
4136                        unsafe { rep_index_builder.append(rep_offset as u64) };
4137                    }
4138                    if control.is_visible {
4139                        let window = windows_iter.next().unwrap();
4140                        if control.is_valid_item {
4141                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4142                            buf.extend_from_slice(
4143                                &variable.data[window[0] as usize..window[1] as usize],
4144                            );
4145                        }
4146                    }
4147                    rep_offset = buf.len();
4148                }
4149            }
4150            _ => panic!("Unsupported offset size"),
4151        }
4152
4153        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
4154        // if we are over `len` then we have a bug.
4155        debug_assert!(buf.len() <= len);
4156        // Put the final value in the rep index
4157        // SAFETY: `zipped_data.len() == len`
4158        unsafe {
4159            rep_index_builder.append(buf.len() as u64);
4160        }
4161
4162        let zipped_data = LanceBuffer::Owned(buf);
4163        let rep_index = rep_index_builder.into_data();
4164        debug_assert!(!rep_index.is_empty());
4165        let rep_index = Some(LanceBuffer::Owned(rep_index));
4166        SerializedFullZip {
4167            values: zipped_data,
4168            repetition_index: rep_index,
4169        }
4170    }
4171
4172    /// Serializes data into a single buffer according to the full-zip format which zips
4173    /// together the repetition, definition, and value data into a single buffer.
4174    fn serialize_full_zip(
4175        compressed_data: PerValueDataBlock,
4176        repdef: ControlWordIterator,
4177        num_items: u64,
4178    ) -> SerializedFullZip {
4179        match compressed_data {
4180            PerValueDataBlock::Fixed(fixed) => {
4181                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4182            }
4183            PerValueDataBlock::Variable(var) => {
4184                Self::serialize_full_zip_variable(var, repdef, num_items)
4185            }
4186        }
4187    }
4188
4189    fn encode_full_zip(
4190        column_idx: u32,
4191        field: &Field,
4192        compression_strategy: &dyn CompressionStrategy,
4193        data: DataBlock,
4194        repdefs: Vec<RepDefBuilder>,
4195        row_number: u64,
4196        num_lists: u64,
4197    ) -> Result<EncodedPage> {
4198        let repdef = RepDefBuilder::serialize(repdefs);
4199        let max_rep = repdef
4200            .repetition_levels
4201            .as_ref()
4202            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4203        let max_def = repdef
4204            .definition_levels
4205            .as_ref()
4206            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4207
4208        // The validity is encoded in repdef so we can remove it
4209        let data = data.remove_validity();
4210
4211        // To handle FSL we just flatten
4212        let data = data.flatten();
4213        let (num_items, num_visible_items) =
4214            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4215                // If there are rep levels there may be "invisible" items and we need to encode
4216                // rep_levels.len() things which might be larger than data.num_values()
4217                (rep_levels.len() as u64, data.num_values())
4218            } else {
4219                // If there are no rep levels then we encode data.num_values() things
4220                (data.num_values(), data.num_values())
4221            };
4222
4223        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4224
4225        let repdef_iter = build_control_word_iterator(
4226            repdef.repetition_levels.as_deref(),
4227            max_rep,
4228            repdef.definition_levels.as_deref(),
4229            max_def,
4230            max_visible_def,
4231            num_items as usize,
4232        );
4233        let bits_rep = repdef_iter.bits_rep();
4234        let bits_def = repdef_iter.bits_def();
4235
4236        let compressor = compression_strategy.create_per_value(field, &data)?;
4237        let (compressed_data, value_encoding) = compressor.compress(data)?;
4238
4239        let description = match &compressed_data {
4240            PerValueDataBlock::Fixed(fixed) => ProtobufUtils::fixed_full_zip_layout(
4241                bits_rep,
4242                bits_def,
4243                fixed.bits_per_value as u32,
4244                value_encoding,
4245                &repdef.def_meaning,
4246                num_items as u32,
4247                num_visible_items as u32,
4248            ),
4249            PerValueDataBlock::Variable(variable) => ProtobufUtils::variable_full_zip_layout(
4250                bits_rep,
4251                bits_def,
4252                variable.bits_per_offset as u32,
4253                value_encoding,
4254                &repdef.def_meaning,
4255                num_items as u32,
4256                num_visible_items as u32,
4257            ),
4258        };
4259
4260        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4261
4262        let data = if let Some(repindex) = zipped.repetition_index {
4263            vec![zipped.values, repindex]
4264        } else {
4265            vec![zipped.values]
4266        };
4267
4268        Ok(EncodedPage {
4269            num_rows: num_lists,
4270            column_idx,
4271            data,
4272            description: PageEncoding::Structural(description),
4273            row_number,
4274        })
4275    }
4276
4277    fn dictionary_encode(mut data_block: DataBlock, cardinality: u64) -> (DataBlock, DataBlock) {
4278        match data_block {
4279            DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
4280                // Currently FixedWidth DataBlock with only bits_per_value 128 has cardinality
4281                // TODO: a follow up PR to support `FixedWidth DataBlock with bits_per_value == 256`.
4282                let mut map = HashMap::new();
4283                let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
4284                let u128_slice = u128_slice.as_ref();
4285                let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
4286                let mut indices_buffer =
4287                    Vec::with_capacity(fixed_width_data_block.num_values as usize);
4288                let mut curr_idx: u8 = 0;
4289                u128_slice.iter().for_each(|&value| {
4290                    let idx = *map.entry(value).or_insert_with(|| {
4291                        dictionary_buffer.push(value);
4292                        curr_idx += 1;
4293                        curr_idx - 1
4294                    });
4295                    indices_buffer.push(idx);
4296                });
4297                let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4298                    data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4299                    bits_per_value: 128,
4300                    num_values: curr_idx as u64,
4301                    block_info: BlockInfo::default(),
4302                });
4303                let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4304                    data: LanceBuffer::reinterpret_vec(indices_buffer),
4305                    bits_per_value: 8,
4306                    num_values: fixed_width_data_block.num_values,
4307                    block_info: BlockInfo::default(),
4308                });
4309                // Todo: if we decide to do eager statistics computing, wrap statistics computing
4310                // in DataBlock constructor.
4311                indices_data_block.compute_stat();
4312
4313                (indices_data_block, dictionary_data_block)
4314            }
4315            DataBlock::VariableWidth(ref mut variable_width_data_block) => {
4316                match variable_width_data_block.bits_per_offset {
4317                    32 => {
4318                        let mut map: HashMap<U8SliceKey, u8> = HashMap::new();
4319                        let offsets = variable_width_data_block
4320                            .offsets
4321                            .borrow_to_typed_slice::<u32>();
4322                        let offsets = offsets.as_ref();
4323
4324                        let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
4325                            "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
4326                        );
4327                        let max_len = max_len.as_primitive::<UInt64Type>().value(0);
4328
4329                        let mut dictionary_buffer: Vec<u8> =
4330                            Vec::with_capacity((max_len * cardinality) as usize);
4331                        let mut dictionary_offsets_buffer = vec![0];
4332                        let mut curr_idx = 0;
4333                        let mut indices_buffer =
4334                            Vec::with_capacity(variable_width_data_block.num_values as usize);
4335
4336                        offsets
4337                            .iter()
4338                            .zip(offsets.iter().skip(1))
4339                            .for_each(|(&start, &end)| {
4340                                let key =
4341                                    &variable_width_data_block.data[start as usize..end as usize];
4342                                let idx = *map.entry(U8SliceKey(key)).or_insert_with(|| {
4343                                    dictionary_buffer.extend_from_slice(key);
4344                                    dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
4345                                    curr_idx += 1;
4346                                    curr_idx - 1
4347                                });
4348                                indices_buffer.push(idx);
4349                            });
4350
4351                        let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
4352                            data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4353                            offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
4354                            bits_per_offset: 32,
4355                            num_values: curr_idx as u64,
4356                            block_info: BlockInfo::default(),
4357                        });
4358
4359                        let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4360                            data: LanceBuffer::Owned(indices_buffer),
4361                            bits_per_value: 8,
4362                            num_values: variable_width_data_block.num_values,
4363                            block_info: BlockInfo::default(),
4364                        });
4365                        // Todo: if we decide to do eager statistics computing, wrap statistics computing
4366                        // in DataBlock constructor.
4367                        indices_data_block.compute_stat();
4368
4369                        (indices_data_block, dictionary_data_block)
4370                    }
4371                    64 => {
4372                        todo!("A follow up PR to support dictionary encoding with dictionary type `VariableWidth DataBlock` with bits_per_offset 64");
4373                    }
4374                    _ => {
4375                        unreachable!()
4376                    }
4377                }
4378            }
4379            _ => {
4380                unreachable!("dictionary encode called with data block {:?}", data_block)
4381            }
4382        }
4383    }
4384
4385    // Creates an encode task, consuming all buffered data
4386    fn do_flush(
4387        &mut self,
4388        arrays: Vec<ArrayRef>,
4389        repdefs: Vec<RepDefBuilder>,
4390        row_number: u64,
4391        num_rows: u64,
4392    ) -> Result<Vec<EncodeTask>> {
4393        let column_idx = self.column_index;
4394        let compression_strategy = self.compression_strategy.clone();
4395        let field = self.field.clone();
4396        let encoding_metadata = self.encoding_metadata.clone();
4397        let task = spawn_cpu(move || {
4398            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4399            if num_values == 0 {
4400                // We should not encode empty arrays.  So if we get here that should mean that we
4401                // either have all empty lists or all null lists (or a mix).  We still need to encode
4402                // the rep/def information but we can skip the data encoding.
4403                return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4404            }
4405            let num_nulls = arrays
4406                .iter()
4407                .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4408                .sum::<u64>();
4409
4410            if num_values == num_nulls {
4411                if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4412                    log::debug!(
4413                        "Encoding column {} with {} items using simple-null layout",
4414                        column_idx,
4415                        num_values
4416                    );
4417                    // Simple case, no rep/def and all nulls, we don't need to encode any data
4418                    Self::encode_simple_all_null(column_idx, num_values, row_number)
4419                } else {
4420                    // If we get here then we have definition levels (presumably due to FSL) and
4421                    // we need to store those
4422                    Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4423                }
4424            } else {
4425                let data_block = DataBlock::from_arrays(&arrays, num_values);
4426
4427                // if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding.
4428                if let DataBlock::Struct(ref struct_data_block) = data_block {
4429                    if struct_data_block
4430                        .children
4431                        .iter()
4432                        .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4433                    {
4434                        panic!("packed struct encoding currently only supports fixed-width fields.")
4435                    }
4436                }
4437
4438                const DICTIONARY_ENCODING_THRESHOLD: u64 = 100;
4439                let cardinality =
4440                    if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4441                        cardinality_array.as_primitive::<UInt64Type>().value(0)
4442                    } else {
4443                        u64::MAX
4444                    };
4445
4446                // The triggering threshold for dictionary encoding can be further tuned.
4447                if cardinality <= DICTIONARY_ENCODING_THRESHOLD
4448                    && data_block.num_values() >= 10 * cardinality
4449                {
4450                    let (indices_data_block, dictionary_data_block) =
4451                        Self::dictionary_encode(data_block, cardinality);
4452                    Self::encode_miniblock(
4453                        column_idx,
4454                        &field,
4455                        compression_strategy.as_ref(),
4456                        indices_data_block,
4457                        repdefs,
4458                        row_number,
4459                        Some(dictionary_data_block),
4460                        num_rows,
4461                    )
4462                } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4463                    log::debug!(
4464                        "Encoding column {} with {} items using mini-block layout",
4465                        column_idx,
4466                        num_values
4467                    );
4468                    Self::encode_miniblock(
4469                        column_idx,
4470                        &field,
4471                        compression_strategy.as_ref(),
4472                        data_block,
4473                        repdefs,
4474                        row_number,
4475                        None,
4476                        num_rows,
4477                    )
4478                } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4479                    log::debug!(
4480                        "Encoding column {} with {} items using full-zip layout",
4481                        column_idx,
4482                        num_values
4483                    );
4484                    Self::encode_full_zip(
4485                        column_idx,
4486                        &field,
4487                        compression_strategy.as_ref(),
4488                        data_block,
4489                        repdefs,
4490                        row_number,
4491                        num_rows,
4492                    )
4493                } else {
4494                    Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4495                }
4496            }
4497        })
4498        .boxed();
4499        Ok(vec![task])
4500    }
4501
4502    fn extract_validity_buf(array: &dyn Array, repdef: &mut RepDefBuilder) {
4503        if let Some(validity) = array.nulls() {
4504            repdef.add_validity_bitmap(validity.clone());
4505        } else {
4506            repdef.add_no_null(array.len());
4507        }
4508    }
4509
4510    fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder) {
4511        match array.data_type() {
4512            DataType::Null => {
4513                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4514            }
4515            DataType::Dictionary(_, _) => {
4516                unreachable!()
4517            }
4518            DataType::FixedSizeList(_, dimension) => {
4519                // Extract our validity buf and then any child validity bufs
4520                repdef.add_fsl(array.nulls().cloned(), *dimension as usize, array.len());
4521                let array = array.as_fixed_size_list();
4522                Self::extract_validity(array.values(), repdef);
4523            }
4524            _ => Self::extract_validity_buf(array, repdef),
4525        }
4526    }
4527}
4528
4529impl FieldEncoder for PrimitiveStructuralEncoder {
4530    // Buffers data, if there is enough to write a page then we create an encode task
4531    fn maybe_encode(
4532        &mut self,
4533        array: ArrayRef,
4534        _external_buffers: &mut OutOfLineBuffers,
4535        mut repdef: RepDefBuilder,
4536        row_number: u64,
4537        num_rows: u64,
4538    ) -> Result<Vec<EncodeTask>> {
4539        Self::extract_validity(array.as_ref(), &mut repdef);
4540        self.accumulated_repdefs.push(repdef);
4541
4542        if let Some((arrays, row_number, num_rows)) =
4543            self.accumulation_queue.insert(array, row_number, num_rows)
4544        {
4545            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4546            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4547        } else {
4548            Ok(vec![])
4549        }
4550    }
4551
4552    // If there is any data left in the buffer then create an encode task from it
4553    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4554        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4555            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4556            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4557        } else {
4558            Ok(vec![])
4559        }
4560    }
4561
4562    fn num_columns(&self) -> u32 {
4563        1
4564    }
4565
4566    fn finish(
4567        &mut self,
4568        _external_buffers: &mut OutOfLineBuffers,
4569    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4570        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4571    }
4572}
4573
4574#[cfg(test)]
4575#[allow(clippy::single_range_in_vec_init)]
4576mod tests {
4577    use std::{collections::VecDeque, sync::Arc};
4578
4579    use arrow_array::{ArrayRef, Int8Array, StringArray};
4580
4581    use crate::encodings::logical::primitive::{
4582        ChunkDrainInstructions, PrimitiveStructuralEncoder,
4583    };
4584
4585    use super::{
4586        ChunkInstructions, DataBlock, DecodeMiniBlockTask, PreambleAction, RepetitionIndex,
4587    };
4588
4589    #[test]
4590    fn test_is_narrow() {
4591        let int8_array = Int8Array::from(vec![1, 2, 3]);
4592        let array_ref: ArrayRef = Arc::new(int8_array);
4593        let block = DataBlock::from_array(array_ref);
4594
4595        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4596
4597        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4598        let block = DataBlock::from_array(string_array);
4599        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4600
4601        let string_array = StringArray::from(vec![
4602            Some("hello world".repeat(100)),
4603            Some("world".to_string()),
4604        ]);
4605        let block = DataBlock::from_array(string_array);
4606        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4607    }
4608
4609    #[test]
4610    fn test_map_range() {
4611        // Null in the middle
4612        // [[A, B, C], [D, E], NULL, [F, G, H]]
4613        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4614        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4615        let max_visible_def = 0;
4616        let total_items = 8;
4617        let max_rep = 1;
4618
4619        let check = |range, expected_item_range, expected_level_range| {
4620            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4621                range,
4622                rep.as_ref(),
4623                def.as_ref(),
4624                max_rep,
4625                max_visible_def,
4626                total_items,
4627                PreambleAction::Absent,
4628            );
4629            assert_eq!(item_range, expected_item_range);
4630            assert_eq!(level_range, expected_level_range);
4631        };
4632
4633        check(0..1, 0..3, 0..3);
4634        check(1..2, 3..5, 3..5);
4635        check(2..3, 5..5, 5..6);
4636        check(3..4, 5..8, 6..9);
4637        check(0..2, 0..5, 0..5);
4638        check(1..3, 3..5, 3..6);
4639        check(2..4, 5..8, 5..9);
4640        check(0..3, 0..5, 0..6);
4641        check(1..4, 3..8, 3..9);
4642        check(0..4, 0..8, 0..9);
4643
4644        // Null at start
4645        // [NULL, [A, B], [C]]
4646        let rep = Some(vec![1, 1, 0, 1]);
4647        let def = Some(vec![1, 0, 0, 0]);
4648        let max_visible_def = 0;
4649        let total_items = 3;
4650
4651        let check = |range, expected_item_range, expected_level_range| {
4652            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4653                range,
4654                rep.as_ref(),
4655                def.as_ref(),
4656                max_rep,
4657                max_visible_def,
4658                total_items,
4659                PreambleAction::Absent,
4660            );
4661            assert_eq!(item_range, expected_item_range);
4662            assert_eq!(level_range, expected_level_range);
4663        };
4664
4665        check(0..1, 0..0, 0..1);
4666        check(1..2, 0..2, 1..3);
4667        check(2..3, 2..3, 3..4);
4668        check(0..2, 0..2, 0..3);
4669        check(1..3, 0..3, 1..4);
4670        check(0..3, 0..3, 0..4);
4671
4672        // Null at end
4673        // [[A], [B, C], NULL]
4674        let rep = Some(vec![1, 1, 0, 1]);
4675        let def = Some(vec![0, 0, 0, 1]);
4676        let max_visible_def = 0;
4677        let total_items = 3;
4678
4679        let check = |range, expected_item_range, expected_level_range| {
4680            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4681                range,
4682                rep.as_ref(),
4683                def.as_ref(),
4684                max_rep,
4685                max_visible_def,
4686                total_items,
4687                PreambleAction::Absent,
4688            );
4689            assert_eq!(item_range, expected_item_range);
4690            assert_eq!(level_range, expected_level_range);
4691        };
4692
4693        check(0..1, 0..1, 0..1);
4694        check(1..2, 1..3, 1..3);
4695        check(2..3, 3..3, 3..4);
4696        check(0..2, 0..3, 0..3);
4697        check(1..3, 1..3, 1..4);
4698        check(0..3, 0..3, 0..4);
4699
4700        // No nulls, with repetition
4701        // [[A, B], [C, D], [E, F]]
4702        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4703        let def: Option<&[u16]> = None;
4704        let max_visible_def = 0;
4705        let total_items = 6;
4706
4707        let check = |range, expected_item_range, expected_level_range| {
4708            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4709                range,
4710                rep.as_ref(),
4711                def.as_ref(),
4712                max_rep,
4713                max_visible_def,
4714                total_items,
4715                PreambleAction::Absent,
4716            );
4717            assert_eq!(item_range, expected_item_range);
4718            assert_eq!(level_range, expected_level_range);
4719        };
4720
4721        check(0..1, 0..2, 0..2);
4722        check(1..2, 2..4, 2..4);
4723        check(2..3, 4..6, 4..6);
4724        check(0..2, 0..4, 0..4);
4725        check(1..3, 2..6, 2..6);
4726        check(0..3, 0..6, 0..6);
4727
4728        // No repetition, with nulls (this case is trivial)
4729        // [A, B, NULL, C]
4730        let rep: Option<&[u16]> = None;
4731        let def = Some(vec![0, 0, 1, 0]);
4732        let max_visible_def = 1;
4733        let total_items = 4;
4734
4735        let check = |range, expected_item_range, expected_level_range| {
4736            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4737                range,
4738                rep.as_ref(),
4739                def.as_ref(),
4740                max_rep,
4741                max_visible_def,
4742                total_items,
4743                PreambleAction::Absent,
4744            );
4745            assert_eq!(item_range, expected_item_range);
4746            assert_eq!(level_range, expected_level_range);
4747        };
4748
4749        check(0..1, 0..1, 0..1);
4750        check(1..2, 1..2, 1..2);
4751        check(2..3, 2..3, 2..3);
4752        check(0..2, 0..2, 0..2);
4753        check(1..3, 1..3, 1..3);
4754        check(0..3, 0..3, 0..3);
4755
4756        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
4757        // [[..., A] [B, C], NULL]
4758        //
4759        // What we do will depend on the preamble action
4760        let rep = Some(vec![0, 1, 0, 1]);
4761        let def = Some(vec![0, 0, 0, 1]);
4762        let max_visible_def = 0;
4763        let total_items = 3;
4764
4765        let check = |range, expected_item_range, expected_level_range| {
4766            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4767                range,
4768                rep.as_ref(),
4769                def.as_ref(),
4770                max_rep,
4771                max_visible_def,
4772                total_items,
4773                PreambleAction::Take,
4774            );
4775            assert_eq!(item_range, expected_item_range);
4776            assert_eq!(level_range, expected_level_range);
4777        };
4778
4779        // If we are taking the preamble then the range must start at 0
4780        check(0..1, 0..3, 0..3);
4781        check(0..2, 0..3, 0..4);
4782
4783        let check = |range, expected_item_range, expected_level_range| {
4784            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4785                range,
4786                rep.as_ref(),
4787                def.as_ref(),
4788                max_rep,
4789                max_visible_def,
4790                total_items,
4791                PreambleAction::Skip,
4792            );
4793            assert_eq!(item_range, expected_item_range);
4794            assert_eq!(level_range, expected_level_range);
4795        };
4796
4797        check(0..1, 1..3, 1..3);
4798        check(1..2, 3..3, 3..4);
4799        check(0..2, 1..3, 1..4);
4800
4801        // Another preamble case but now it doesn't end with a new list
4802        // [[..., A], NULL, [D, E]]
4803        //
4804        // What we do will depend on the preamble action
4805        let rep = Some(vec![0, 1, 1, 0]);
4806        let def = Some(vec![0, 1, 0, 0]);
4807        let max_visible_def = 0;
4808        let total_items = 4;
4809
4810        let check = |range, expected_item_range, expected_level_range| {
4811            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4812                range,
4813                rep.as_ref(),
4814                def.as_ref(),
4815                max_rep,
4816                max_visible_def,
4817                total_items,
4818                PreambleAction::Take,
4819            );
4820            assert_eq!(item_range, expected_item_range);
4821            assert_eq!(level_range, expected_level_range);
4822        };
4823
4824        // If we are taking the preamble then the range must start at 0
4825        check(0..1, 0..1, 0..2);
4826        check(0..2, 0..3, 0..4);
4827
4828        let check = |range, expected_item_range, expected_level_range| {
4829            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4830                range,
4831                rep.as_ref(),
4832                def.as_ref(),
4833                max_rep,
4834                max_visible_def,
4835                total_items,
4836                PreambleAction::Skip,
4837            );
4838            assert_eq!(item_range, expected_item_range);
4839            assert_eq!(level_range, expected_level_range);
4840        };
4841
4842        // If we are taking the preamble then the range must start at 0
4843        check(0..1, 1..1, 1..2);
4844        check(1..2, 1..3, 2..4);
4845        check(0..2, 1..3, 1..4);
4846
4847        // Now a preamble case without any definition levels
4848        // [[..., A] [B, C], [D]]
4849        let rep = Some(vec![0, 1, 0, 1]);
4850        let def: Option<Vec<u16>> = None;
4851        let max_visible_def = 0;
4852        let total_items = 4;
4853
4854        let check = |range, expected_item_range, expected_level_range| {
4855            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4856                range,
4857                rep.as_ref(),
4858                def.as_ref(),
4859                max_rep,
4860                max_visible_def,
4861                total_items,
4862                PreambleAction::Take,
4863            );
4864            assert_eq!(item_range, expected_item_range);
4865            assert_eq!(level_range, expected_level_range);
4866        };
4867
4868        // If we are taking the preamble then the range must start at 0
4869        check(0..1, 0..3, 0..3);
4870        check(0..2, 0..4, 0..4);
4871
4872        let check = |range, expected_item_range, expected_level_range| {
4873            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4874                range,
4875                rep.as_ref(),
4876                def.as_ref(),
4877                max_rep,
4878                max_visible_def,
4879                total_items,
4880                PreambleAction::Skip,
4881            );
4882            assert_eq!(item_range, expected_item_range);
4883            assert_eq!(level_range, expected_level_range);
4884        };
4885
4886        check(0..1, 1..3, 1..3);
4887        check(1..2, 3..4, 3..4);
4888        check(0..2, 1..4, 1..4);
4889    }
4890
4891    #[test]
4892    fn test_schedule_instructions() {
4893        let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4894        let repetition_index = RepetitionIndex::decode(&repetition_index);
4895
4896        let check = |user_ranges, expected_instructions| {
4897            let instructions =
4898                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4899            assert_eq!(instructions, expected_instructions);
4900        };
4901
4902        // The instructions we expect if we're grabbing the whole range
4903        let expected_take_all = vec![
4904            ChunkInstructions {
4905                chunk_idx: 0,
4906                preamble: PreambleAction::Absent,
4907                rows_to_skip: 0,
4908                rows_to_take: 5,
4909                take_trailer: true,
4910            },
4911            ChunkInstructions {
4912                chunk_idx: 1,
4913                preamble: PreambleAction::Take,
4914                rows_to_skip: 0,
4915                rows_to_take: 2,
4916                take_trailer: false,
4917            },
4918            ChunkInstructions {
4919                chunk_idx: 2,
4920                preamble: PreambleAction::Absent,
4921                rows_to_skip: 0,
4922                rows_to_take: 4,
4923                take_trailer: true,
4924            },
4925            ChunkInstructions {
4926                chunk_idx: 3,
4927                preamble: PreambleAction::Take,
4928                rows_to_skip: 0,
4929                rows_to_take: 1,
4930                take_trailer: false,
4931            },
4932        ];
4933
4934        // Take all as 1 range
4935        check(&[0..14], expected_take_all.clone());
4936
4937        // Take all a individual rows
4938        check(
4939            &[
4940                0..1,
4941                1..2,
4942                2..3,
4943                3..4,
4944                4..5,
4945                5..6,
4946                6..7,
4947                7..8,
4948                8..9,
4949                9..10,
4950                10..11,
4951                11..12,
4952                12..13,
4953                13..14,
4954            ],
4955            expected_take_all,
4956        );
4957
4958        // Test some partial takes
4959
4960        // 2 rows in the same chunk but not contiguous
4961        check(
4962            &[0..1, 3..4],
4963            vec![
4964                ChunkInstructions {
4965                    chunk_idx: 0,
4966                    preamble: PreambleAction::Absent,
4967                    rows_to_skip: 0,
4968                    rows_to_take: 1,
4969                    take_trailer: false,
4970                },
4971                ChunkInstructions {
4972                    chunk_idx: 0,
4973                    preamble: PreambleAction::Absent,
4974                    rows_to_skip: 3,
4975                    rows_to_take: 1,
4976                    take_trailer: false,
4977                },
4978            ],
4979        );
4980
4981        // Taking just a trailer/preamble
4982        check(
4983            &[5..6],
4984            vec![
4985                ChunkInstructions {
4986                    chunk_idx: 0,
4987                    preamble: PreambleAction::Absent,
4988                    rows_to_skip: 5,
4989                    rows_to_take: 0,
4990                    take_trailer: true,
4991                },
4992                ChunkInstructions {
4993                    chunk_idx: 1,
4994                    preamble: PreambleAction::Take,
4995                    rows_to_skip: 0,
4996                    rows_to_take: 0,
4997                    take_trailer: false,
4998                },
4999            ],
5000        );
5001
5002        // Skipping an entire chunk
5003        check(
5004            &[7..10],
5005            vec![
5006                ChunkInstructions {
5007                    chunk_idx: 1,
5008                    preamble: PreambleAction::Skip,
5009                    rows_to_skip: 1,
5010                    rows_to_take: 1,
5011                    take_trailer: false,
5012                },
5013                ChunkInstructions {
5014                    chunk_idx: 2,
5015                    preamble: PreambleAction::Absent,
5016                    rows_to_skip: 0,
5017                    rows_to_take: 2,
5018                    take_trailer: false,
5019                },
5020            ],
5021        );
5022    }
5023
5024    #[test]
5025    fn test_drain_instructions() {
5026        fn drain_from_instructions(
5027            instructions: &mut VecDeque<ChunkInstructions>,
5028            mut rows_desired: u64,
5029            need_preamble: &mut bool,
5030            skip_in_chunk: &mut u64,
5031        ) -> Vec<ChunkDrainInstructions> {
5032            // Note: instructions.len() is an upper bound, we typically take much fewer
5033            let mut drain_instructions = Vec::with_capacity(instructions.len());
5034            while rows_desired > 0 || *need_preamble {
5035                let (next_instructions, consumed_chunk) = instructions
5036                    .front()
5037                    .unwrap()
5038                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5039                if consumed_chunk {
5040                    instructions.pop_front();
5041                }
5042                drain_instructions.push(next_instructions);
5043            }
5044            drain_instructions
5045        }
5046
5047        let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
5048        let repetition_index = RepetitionIndex::decode(&repetition_index);
5049        let user_ranges = vec![1..7, 10..14];
5050
5051        // First, schedule the ranges
5052        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5053
5054        let mut to_drain = VecDeque::from(scheduled.clone());
5055
5056        // Now we drain in batches of 4
5057
5058        let mut need_preamble = false;
5059        let mut skip_in_chunk = 0;
5060
5061        let next_batch =
5062            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5063
5064        assert!(!need_preamble);
5065        assert_eq!(skip_in_chunk, 4);
5066        assert_eq!(
5067            next_batch,
5068            vec![ChunkDrainInstructions {
5069                chunk_instructions: scheduled[0].clone(),
5070                rows_to_take: 4,
5071                rows_to_skip: 0,
5072                preamble_action: PreambleAction::Absent,
5073            }]
5074        );
5075
5076        let next_batch =
5077            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5078
5079        assert!(!need_preamble);
5080        assert_eq!(skip_in_chunk, 2);
5081
5082        assert_eq!(
5083            next_batch,
5084            vec![
5085                ChunkDrainInstructions {
5086                    chunk_instructions: scheduled[0].clone(),
5087                    rows_to_take: 1,
5088                    rows_to_skip: 4,
5089                    preamble_action: PreambleAction::Absent,
5090                },
5091                ChunkDrainInstructions {
5092                    chunk_instructions: scheduled[1].clone(),
5093                    rows_to_take: 1,
5094                    rows_to_skip: 0,
5095                    preamble_action: PreambleAction::Take,
5096                },
5097                ChunkDrainInstructions {
5098                    chunk_instructions: scheduled[2].clone(),
5099                    rows_to_take: 2,
5100                    rows_to_skip: 0,
5101                    preamble_action: PreambleAction::Absent,
5102                }
5103            ]
5104        );
5105
5106        let next_batch =
5107            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5108
5109        assert!(!need_preamble);
5110        assert_eq!(skip_in_chunk, 0);
5111
5112        assert_eq!(
5113            next_batch,
5114            vec![
5115                ChunkDrainInstructions {
5116                    chunk_instructions: scheduled[2].clone(),
5117                    rows_to_take: 1,
5118                    rows_to_skip: 2,
5119                    preamble_action: PreambleAction::Absent,
5120                },
5121                ChunkDrainInstructions {
5122                    chunk_instructions: scheduled[3].clone(),
5123                    rows_to_take: 1,
5124                    rows_to_skip: 0,
5125                    preamble_action: PreambleAction::Take,
5126                },
5127            ]
5128        );
5129
5130        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
5131        let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
5132        let repetition_index = RepetitionIndex::decode(&repetition_index);
5133        let user_ranges = vec![0..28];
5134
5135        // First, schedule the ranges
5136        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5137
5138        let mut to_drain = VecDeque::from(scheduled.clone());
5139
5140        // Drain first chunk and some of second chunk
5141
5142        let mut need_preamble = false;
5143        let mut skip_in_chunk = 0;
5144
5145        let next_batch =
5146            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5147
5148        assert_eq!(
5149            next_batch,
5150            vec![
5151                ChunkDrainInstructions {
5152                    chunk_instructions: scheduled[0].clone(),
5153                    rows_to_take: 6,
5154                    rows_to_skip: 0,
5155                    preamble_action: PreambleAction::Absent,
5156                },
5157                ChunkDrainInstructions {
5158                    chunk_instructions: scheduled[1].clone(),
5159                    rows_to_take: 1,
5160                    rows_to_skip: 0,
5161                    preamble_action: PreambleAction::Take,
5162                },
5163            ]
5164        );
5165
5166        assert!(!need_preamble);
5167        assert_eq!(skip_in_chunk, 1);
5168
5169        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
5170        // we get a drain task to take the preamble of the third chunk (and nothing else)
5171        let next_batch =
5172            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5173
5174        assert_eq!(
5175            next_batch,
5176            vec![
5177                ChunkDrainInstructions {
5178                    chunk_instructions: scheduled[1].clone(),
5179                    rows_to_take: 2,
5180                    rows_to_skip: 1,
5181                    preamble_action: PreambleAction::Skip,
5182                },
5183                ChunkDrainInstructions {
5184                    chunk_instructions: scheduled[2].clone(),
5185                    rows_to_take: 0,
5186                    rows_to_skip: 0,
5187                    preamble_action: PreambleAction::Take,
5188                },
5189            ]
5190        );
5191
5192        assert!(!need_preamble);
5193        assert_eq!(skip_in_chunk, 0);
5194    }
5195}