lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity (from
47//! different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.
73//!
74//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
75//! (which has one element per list) might be more compact than a dense array of repetition levels
76//! (which has one element per list value, possibly even more if there are empty lists).
77//!
78//! However, both repetition levels and definition levels are typically very compressible with RLE.
79//!
80//! However, in Lance we don't always take advantage of that compression because we want to be able
81//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
82
83use std::{
84    iter::{Copied, Zip},
85    sync::Arc,
86};
87
88use arrow_array::OffsetSizeTrait;
89use arrow_buffer::{
90    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
91};
92use lance_core::{utils::bit::log_2_ceil, Error, Result};
93use snafu::location;
94
95use crate::buffer::LanceBuffer;
96
97// We assume 16 bits is good enough for rep-def levels.  This gives us
98// 65536 levels of struct nesting and list nesting.
99pub type LevelBuffer = Vec<u16>;
100
101/// Represents information that we extract from a list array as we are
102/// encoding
103#[derive(Clone, Debug)]
104struct OffsetDesc {
105    offsets: Arc<[i64]>,
106    specials: Arc<[SpecialOffset]>,
107    validity: Option<BooleanBuffer>,
108    has_empty_lists: bool,
109    num_values: usize,
110}
111
112/// Represents validity information that we extract from non-list arrays (that
113/// have nulls) as we are encoding
114#[derive(Clone, Debug)]
115struct ValidityDesc {
116    validity: Option<BooleanBuffer>,
117    num_values: usize,
118}
119
120/// Represents validity information that we extract from FSL arrays.  This is
121/// just validity (no offsets) but we also record the dimension of the FSL array
122/// as that will impact the next layer
123#[derive(Clone, Debug)]
124struct FslDesc {
125    validity: Option<BooleanBuffer>,
126    dimension: usize,
127    num_values: usize,
128}
129
130// As we build up rep/def from arrow arrays we record a
131// series of RawRepDef objects.  Each one corresponds to layer
132// in the array structure
133#[derive(Clone, Debug)]
134enum RawRepDef {
135    Offsets(OffsetDesc),
136    Validity(ValidityDesc),
137    Fsl(FslDesc),
138}
139
140impl RawRepDef {
141    // Are there any nulls in this layer
142    fn has_nulls(&self) -> bool {
143        match self {
144            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
145            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
146            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
147        }
148    }
149
150    // How many values are in this layer
151    fn num_values(&self) -> usize {
152        match self {
153            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
154            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
155            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
156        }
157    }
158}
159
160/// Represents repetition and definition levels that have been
161/// serialized into a pair of (optional) level buffers
162#[derive(Debug)]
163pub struct SerializedRepDefs {
164    /// The repetition levels, one per item
165    ///
166    /// If None, there are no lists
167    pub repetition_levels: Option<Arc<[u16]>>,
168    /// The definition levels, one per item
169    ///
170    /// If None, there are no nulls
171    pub definition_levels: Option<Arc<[u16]>>,
172    /// Special records indicate empty / null lists
173    ///
174    /// These do not have any mapping to items.  There may be empty or there may
175    /// be more special records than items or anywhere in between.
176    pub special_records: Vec<SpecialRecord>,
177    /// The meaning of each definition level
178    pub def_meaning: Vec<DefinitionInterpretation>,
179    /// The maximum level that is "visible" from the lowest level
180    ///
181    /// This is the last level before we encounter a list level of some kind.  Once we've
182    /// hit a list level then nulls in any level beyond do not map to actual items.
183    ///
184    /// This is None if there are no lists
185    pub max_visible_level: Option<u16>,
186}
187
188impl SerializedRepDefs {
189    pub fn new(
190        repetition_levels: Option<LevelBuffer>,
191        definition_levels: Option<LevelBuffer>,
192        special_records: Vec<SpecialRecord>,
193        def_meaning: Vec<DefinitionInterpretation>,
194    ) -> Self {
195        let first_list = def_meaning.iter().position(|level| level.is_list());
196        let max_visible_level = first_list.map(|first_list| {
197            def_meaning
198                .iter()
199                .map(|level| level.num_def_levels())
200                .take(first_list)
201                .sum::<u16>()
202        });
203        Self {
204            repetition_levels: repetition_levels.map(Arc::from),
205            definition_levels: definition_levels.map(Arc::from),
206            special_records,
207            def_meaning,
208            max_visible_level,
209        }
210    }
211
212    /// Creates an empty SerializedRepDefs (no repetition, all valid)
213    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
214        Self {
215            repetition_levels: None,
216            definition_levels: None,
217            special_records: Vec::new(),
218            def_meaning,
219            max_visible_level: None,
220        }
221    }
222
223    pub fn rep_slicer(&self) -> Option<RepDefSlicer> {
224        self.repetition_levels
225            .as_ref()
226            .map(|rep| RepDefSlicer::new(self, rep.clone()))
227    }
228
229    pub fn def_slicer(&self) -> Option<RepDefSlicer> {
230        self.definition_levels
231            .as_ref()
232            .map(|def| RepDefSlicer::new(self, def.clone()))
233    }
234
235    /// Creates a version of the SerializedRepDefs with the specials collapsed into
236    /// the repetition and definition levels
237    pub fn collapse_specials(self) -> Self {
238        if self.special_records.is_empty() {
239            return self;
240        }
241
242        // If we have specials then we must have repetition
243        let rep = self.repetition_levels.unwrap();
244
245        let new_len = rep.len() + self.special_records.len();
246
247        let mut new_rep = Vec::with_capacity(new_len);
248        let mut new_def = Vec::with_capacity(new_len);
249
250        // Now we just merge the rep/def levels and the specials into one list.  There is just
251        // one tricky part.  If a non-special is added after a special item then it swaps its
252        // repetition level with the special item.
253        if let Some(def) = self.definition_levels {
254            let mut def_itr = def.iter();
255            let mut rep_itr = rep.iter();
256            let mut special_itr = self.special_records.into_iter().peekable();
257            let mut last_special = None;
258
259            for idx in 0..new_len {
260                if let Some(special) = special_itr.peek() {
261                    if special.pos == idx {
262                        new_rep.push(special.rep_level);
263                        new_def.push(special.def_level);
264                        special_itr.next();
265                        last_special = Some(new_rep.last_mut().unwrap());
266                    } else {
267                        let rep = if let Some(last_special) = last_special {
268                            let rep = *last_special;
269                            *last_special = *rep_itr.next().unwrap();
270                            rep
271                        } else {
272                            *rep_itr.next().unwrap()
273                        };
274                        new_rep.push(rep);
275                        new_def.push(*def_itr.next().unwrap());
276                        last_special = None;
277                    }
278                } else {
279                    let rep = if let Some(last_special) = last_special {
280                        let rep = *last_special;
281                        *last_special = *rep_itr.next().unwrap();
282                        rep
283                    } else {
284                        *rep_itr.next().unwrap()
285                    };
286                    new_rep.push(rep);
287                    new_def.push(*def_itr.next().unwrap());
288                    last_special = None;
289                }
290            }
291        } else {
292            let mut rep_itr = rep.iter();
293            let mut special_itr = self.special_records.into_iter().peekable();
294            let mut last_special = None;
295
296            for idx in 0..new_len {
297                if let Some(special) = special_itr.peek() {
298                    if special.pos == idx {
299                        new_rep.push(special.rep_level);
300                        new_def.push(special.def_level);
301                        special_itr.next();
302                        last_special = Some(new_rep.last_mut().unwrap());
303                    } else {
304                        let rep = if let Some(last_special) = last_special {
305                            let rep = *last_special;
306                            *last_special = *rep_itr.next().unwrap();
307                            rep
308                        } else {
309                            *rep_itr.next().unwrap()
310                        };
311                        new_rep.push(rep);
312                        new_def.push(0);
313                        last_special = None;
314                    }
315                } else {
316                    let rep = if let Some(last_special) = last_special {
317                        let rep = *last_special;
318                        *last_special = *rep_itr.next().unwrap();
319                        rep
320                    } else {
321                        *rep_itr.next().unwrap()
322                    };
323                    new_rep.push(rep);
324                    new_def.push(0);
325                    last_special = None;
326                }
327            }
328        }
329
330        Self {
331            repetition_levels: Some(new_rep.into()),
332            definition_levels: Some(new_def.into()),
333            special_records: Vec::new(),
334            def_meaning: self.def_meaning,
335            max_visible_level: self.max_visible_level,
336        }
337    }
338}
339
340/// Slices a level buffer into pieces
341///
342/// This is needed to handle the fact that a level buffer may have more
343/// levels than values due to special (empty/null) lists.
344///
345/// As a result, a call to `slice_next(10)` may return 10 levels or it may
346/// return more than 10 levels if any special values are encountered.
347#[derive(Debug)]
348pub struct RepDefSlicer<'a> {
349    repdef: &'a SerializedRepDefs,
350    to_slice: LanceBuffer,
351    current: usize,
352}
353
354// TODO: All of this logic will need some changing when we compress rep/def levels.
355impl<'a> RepDefSlicer<'a> {
356    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
357        Self {
358            repdef,
359            to_slice: LanceBuffer::reinterpret_slice(levels),
360            current: 0,
361        }
362    }
363
364    pub fn num_levels(&self) -> usize {
365        self.to_slice.len() / 2
366    }
367
368    pub fn num_levels_remaining(&self) -> usize {
369        self.num_levels() - self.current
370    }
371
372    pub fn all_levels(&self) -> &LanceBuffer {
373        &self.to_slice
374    }
375
376    /// Returns the rest of the levels not yet sliced
377    ///
378    /// This must be called instead of `slice_next` on the final iteration.
379    /// This is because anytime we slice there may be empty/null lists on the
380    /// boundary that are "free" and the current behavior in `slice_next` is to
381    /// leave them for the next call.
382    ///
383    /// `slice_rest` will slice all remaining levels and return them.
384    pub fn slice_rest(&mut self) -> LanceBuffer {
385        let start = self.current;
386        let remaining = self.num_levels_remaining();
387        self.current = self.num_levels();
388        self.to_slice.slice_with_length(start * 2, remaining * 2)
389    }
390
391    /// Returns enough levels to satisfy the next `num_values` values
392    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
393        let start = self.current;
394        let Some(max_visible_level) = self.repdef.max_visible_level else {
395            // No lists, should be 1:1 mapping from levels to values
396            self.current = start + num_values;
397            return self.to_slice.slice_with_length(start * 2, num_values * 2);
398        };
399        if let Some(def) = self.repdef.definition_levels.as_ref() {
400            // There are lists and there are def levels.  That means there may be
401            // more rep/def levels than values.  We need to scan the def levels to figure
402            // out which items are "invisible" and skip over them
403            let mut def_itr = def[start..].iter();
404            let mut num_taken = 0;
405            let mut num_passed = 0;
406            while num_taken < num_values {
407                let def_level = *def_itr.next().unwrap();
408                if def_level <= max_visible_level {
409                    num_taken += 1;
410                }
411                num_passed += 1;
412            }
413            self.current = start + num_passed;
414            self.to_slice.slice_with_length(start * 2, num_passed * 2)
415        } else {
416            // No def levels, should be 1:1 mapping from levels to values
417            self.current = start + num_values;
418            self.to_slice.slice_with_length(start * 2, num_values * 2)
419        }
420    }
421}
422
423#[derive(Debug, Copy, Clone, PartialEq, Eq)]
424pub struct SpecialRecord {
425    /// The position of the special record in the items array
426    ///
427    /// Note that this is the position in the "expanded" items array (including the specials)
428    ///
429    /// For example, if we have five items [I0, I1, ..., I4] and two specials [S0(pos=3), S1(pos=6)] then
430    /// the combined array is [I0, I1, I2, S0, I3, I4, S1].
431    ///
432    /// Another tricky fact is that a special "swaps" the repetition level of the matching item when it is
433    /// being inserted into the combined list.  So, if items are [I0(rep=2), I1(rep=1), I2(rep=2), I3(rep=0)]
434    /// and a special is S0(pos=2, rep=1) then the combined list is
435    /// [I0(rep=2), I1(rep=1), S0(rep=2), I2(rep=1), I3(rep=0)].
436    ///
437    /// Or, to put it in practice we start with [[I0], [I1]], [[I2, I3]] and after inserting our special
438    /// we have [[I0], [I1]], [S0, [I2, I3]]
439    pos: usize,
440    /// The definition level of the special record.  This is never 0 and is used to distinguish between an
441    /// empty list and a null list.
442    def_level: u16,
443    /// The repetition level of the special record.  This is never 0 and is used to indicate which level of
444    /// nesting the special record is at.
445    rep_level: u16,
446}
447
448/// This tells us how an array handles definition.  Given a stack of
449/// these and a nested array and a set of definition levels we can calculate
450/// how we should interpret the definition levels.
451///
452/// For example, if the interpretation is [AllValidItem, NullableItem] then
453/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
454/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
455/// "null item" and a 2 means "null struct".
456///
457/// Lists are tricky because we might use up to two definition levels for a
458/// single layer of list nesting because we need one value to indicate "empty list"
459/// and another value to indicate "null list".
460#[derive(Debug, Copy, Clone, PartialEq, Eq)]
461pub enum DefinitionInterpretation {
462    AllValidItem,
463    AllValidList,
464    NullableItem,
465    NullableList,
466    EmptyableList,
467    NullableAndEmptyableList,
468}
469
470impl DefinitionInterpretation {
471    /// How many definition levels do we need for this layer
472    pub fn num_def_levels(&self) -> u16 {
473        match self {
474            Self::AllValidItem => 0,
475            Self::AllValidList => 0,
476            Self::NullableItem => 1,
477            Self::NullableList => 1,
478            Self::EmptyableList => 1,
479            Self::NullableAndEmptyableList => 2,
480        }
481    }
482
483    /// Does this layer have nulls?
484    pub fn is_all_valid(&self) -> bool {
485        matches!(
486            self,
487            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
488        )
489    }
490
491    /// Does this layer represent a list?
492    pub fn is_list(&self) -> bool {
493        matches!(
494            self,
495            Self::AllValidList
496                | Self::NullableList
497                | Self::EmptyableList
498                | Self::NullableAndEmptyableList
499        )
500    }
501}
502
503/// The RepDefBuilder is used to collect offsets & validity buffers
504/// from arrow structures.  Once we have those we use the SerializerContext
505/// to build the actual repetition and definition levels by walking through
506/// the arrow constructs in reverse order.
507///
508/// The algorithm for definition levels is as follows:
509///
510/// Given:
511///  - a validity buffer of [T, F, F, T, T]
512///  - a current def level of 5
513///  - a current definitions of [0, 1, 3, 3, 0]
514///
515/// We walk through the definitions and replace them with
516///   the current level whenever a value is invalid.  Thus
517///   our output is: [0, 5, 5, 3, 0]
518///
519/// The algorithm for repetition levels is more complex.
520///
521/// The first time we see an offsets buffer we initialize the
522/// rep levels to have a value of 1 whenever a list starts and 0
523/// otherwise.
524///
525/// So, given offsets of [0, 3, 5] and no repetition we create
526/// rep levels [1 0 0 1 0]
527///
528/// However, we also record the offsets into our current rep and
529/// def levels and all operations happen in context of those offsets.
530///
531/// For example, continuing the above scenario we might then see validity
532/// of [T, F].  This is strange since our validity bitmap has 2 items but
533/// we would have 5 definition levels.  We can use our current offsets
534/// ([0, 3, 5]) to expand [T, F] into [T, T, T, F, F].
535struct SerializerContext {
536    last_offsets: Option<Vec<usize>>,
537    last_offsets_full: Option<Vec<usize>>,
538    specials: Vec<SpecialRecord>,
539    def_meaning: Vec<DefinitionInterpretation>,
540    rep_levels: LevelBuffer,
541    def_levels: LevelBuffer,
542    current_rep: u16,
543    current_def: u16,
544    // FSL layers multiply the preceding def / rep levels by the dimension
545    current_multiplier: usize,
546    has_nulls: bool,
547}
548
549impl SerializerContext {
550    fn new(len: usize, has_nulls: bool, has_offsets: bool, num_layers: usize) -> Self {
551        let def_meaning = Vec::with_capacity(num_layers);
552        Self {
553            last_offsets: None,
554            last_offsets_full: None,
555            rep_levels: if has_offsets {
556                vec![0; len]
557            } else {
558                LevelBuffer::default()
559            },
560            def_levels: if has_nulls {
561                vec![0; len]
562            } else {
563                LevelBuffer::default()
564            },
565            def_meaning,
566            current_rep: 1,
567            current_def: 1,
568            current_multiplier: 1,
569            has_nulls: false,
570            specials: Vec::default(),
571        }
572    }
573
574    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
575        let def = self.current_def;
576        self.current_def += meaning.num_def_levels();
577        self.def_meaning.push(meaning);
578        def
579    }
580
581    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
582        if self.current_multiplier != 1 {
583            // If we need this it isn't too terrible.  We just need to multiply all of the offsets in offset_desc by
584            // the current multiplier before we do anything with them.  Not adding at the moment simply to avoid the
585            // burden of testing
586            todo!("List<...FSL<...>> not yet supported");
587        }
588        let rep_level = self.current_rep;
589        let (null_list_level, empty_list_level) =
590            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
591                (true, true) => {
592                    let level =
593                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
594                    (level, level + 1)
595                }
596                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
597                (false, true) => (
598                    0,
599                    self.checkout_def(DefinitionInterpretation::EmptyableList),
600                ),
601                (false, false) => {
602                    self.checkout_def(DefinitionInterpretation::AllValidList);
603                    (0, 0)
604                }
605            };
606        self.current_rep += 1;
607        if let Some(last_offsets) = &self.last_offsets {
608            let last_offsets_full = self.last_offsets_full.as_ref().unwrap();
609            let mut new_last_off = Vec::with_capacity(offset_desc.offsets.len());
610            let mut new_last_off_full = Vec::with_capacity(offset_desc.offsets.len());
611            let mut empties_seen = 0;
612            for off in offset_desc.offsets.windows(2) {
613                let offset_ctx = last_offsets[off[0] as usize];
614                new_last_off.push(offset_ctx);
615                new_last_off_full.push(last_offsets_full[off[0] as usize] + empties_seen);
616                if off[0] == off[1] {
617                    empties_seen += 1;
618                } else {
619                    self.rep_levels[offset_ctx] = rep_level;
620                }
621            }
622            self.last_offsets = Some(new_last_off);
623            self.last_offsets_full = Some(new_last_off_full);
624        } else {
625            let mut new_last_off = Vec::with_capacity(offset_desc.offsets.len());
626            let mut new_last_off_full = Vec::with_capacity(offset_desc.offsets.len());
627            let mut empties_seen = 0;
628            for off in offset_desc.offsets.windows(2) {
629                new_last_off.push(off[0] as usize);
630                new_last_off_full.push(off[0] as usize + empties_seen);
631                if off[0] == off[1] {
632                    empties_seen += 1;
633                } else {
634                    self.rep_levels[off[0] as usize] = rep_level;
635                }
636            }
637            self.last_offsets = Some(new_last_off);
638            self.last_offsets_full = Some(new_last_off_full);
639        }
640
641        // Must update specials _after_ setting last_offsets_full
642        let last_offsets_full = self.last_offsets_full.as_ref().unwrap();
643        let num_combined_specials = self.specials.len() + offset_desc.specials.len();
644        let mut new_specials = Vec::with_capacity(num_combined_specials);
645        let mut new_inserted = 0;
646        let mut old_specials_itr = self.specials.iter().peekable();
647        let mut specials_itr = offset_desc.specials.iter().peekable();
648        for _ in 0..num_combined_specials {
649            if let Some(old_special) = old_specials_itr.peek() {
650                let old_special_pos = old_special.pos + new_inserted;
651                if let Some(new_special) = specials_itr.peek() {
652                    let new_special_pos = last_offsets_full[new_special.pos()];
653                    if old_special_pos < new_special_pos {
654                        let mut old_special = *old_specials_itr.next().unwrap();
655                        old_special.pos = old_special_pos;
656                        new_specials.push(old_special);
657                    } else {
658                        let new_special = specials_itr.next().unwrap();
659                        new_specials.push(SpecialRecord {
660                            pos: new_special_pos,
661                            def_level: if matches!(new_special, SpecialOffset::EmptyList(_)) {
662                                empty_list_level
663                            } else {
664                                null_list_level
665                            },
666                            rep_level,
667                        });
668                        new_inserted += 1;
669                    }
670                } else {
671                    let mut old_special = *old_specials_itr.next().unwrap();
672                    old_special.pos = old_special_pos;
673                    new_specials.push(old_special);
674                }
675            } else {
676                let new_special = specials_itr.next().unwrap();
677                new_specials.push(SpecialRecord {
678                    pos: last_offsets_full[new_special.pos()],
679                    def_level: if matches!(new_special, SpecialOffset::EmptyList(_)) {
680                        empty_list_level
681                    } else {
682                        null_list_level
683                    },
684                    rep_level,
685                });
686                new_inserted += 1;
687            }
688        }
689        self.specials = new_specials;
690    }
691
692    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
693        self.has_nulls = true;
694        assert!(!self.def_levels.is_empty());
695        if let Some(last_offsets) = &self.last_offsets {
696            last_offsets
697                .windows(2)
698                .zip(validity.iter())
699                .for_each(|(w, valid)| {
700                    let start = w[0] * self.current_multiplier;
701                    let end = w[1] * self.current_multiplier;
702                    if !valid {
703                        self.def_levels[start..end].fill(null_level);
704                    }
705                });
706        } else if self.current_multiplier == 1 {
707            self.def_levels
708                .iter_mut()
709                .zip(validity.iter())
710                .for_each(|(def, valid)| {
711                    if !valid {
712                        *def = null_level;
713                    }
714                });
715        } else {
716            self.def_levels
717                .iter_mut()
718                .zip(
719                    validity
720                        .iter()
721                        .flat_map(|v| std::iter::repeat(v).take(self.current_multiplier)),
722                )
723                .for_each(|(def, valid)| {
724                    if !valid {
725                        *def = null_level;
726                    }
727                });
728        }
729    }
730
731    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
732        if let Some(validity) = validity {
733            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
734            self.do_record_validity(validity, def_level);
735        } else {
736            self.checkout_def(DefinitionInterpretation::AllValidItem);
737        }
738    }
739
740    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
741        self.record_validity_buf(&validity_desc.validity)
742    }
743
744    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
745        self.current_multiplier *= fsl_desc.dimension;
746        self.record_validity_buf(&fsl_desc.validity);
747    }
748
749    fn build(self) -> SerializedRepDefs {
750        let definition_levels = if self.has_nulls {
751            Some(self.def_levels)
752        } else {
753            None
754        };
755        let repetition_levels = if self.current_rep > 1 {
756            Some(self.rep_levels)
757        } else {
758            None
759        };
760        SerializedRepDefs::new(
761            repetition_levels,
762            definition_levels,
763            self.specials,
764            self.def_meaning,
765        )
766    }
767}
768
769/// As we are encoding we record information about "specials" which are
770/// empty lists or null lists.
771#[derive(Debug, Copy, Clone)]
772enum SpecialOffset {
773    NullList(usize),
774    EmptyList(usize),
775}
776
777impl SpecialOffset {
778    fn pos(&self) -> usize {
779        match self {
780            Self::NullList(pos) => *pos,
781            Self::EmptyList(pos) => *pos,
782        }
783    }
784}
785
786/// A structure used to collect validity buffers and offsets from arrow
787/// arrays and eventually create repetition and definition levels
788///
789/// As we are encoding the structural encoders are given this struct and
790/// will record the arrow information into it.  Once we hit a leaf node we
791/// serialize the data into rep/def levels and write these into the page.
792#[derive(Clone, Default, Debug)]
793pub struct RepDefBuilder {
794    // The rep/def info we have collected so far
795    repdefs: Vec<RawRepDef>,
796    // The current length, can get larger as we traverse lists (e.g. an
797    // array might have 5 lists which results in 50 items)
798    //
799    // Starts uninitialized until we see the first rep/def item
800    len: Option<usize>,
801}
802
803impl RepDefBuilder {
804    fn check_validity_len(&mut self, incoming_len: usize) {
805        if let Some(len) = self.len {
806            assert_eq!(incoming_len, len);
807        }
808        self.len = Some(incoming_len);
809    }
810
811    fn num_layers(&self) -> usize {
812        self.repdefs.len()
813    }
814
815    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
816    /// to store anything to disk (except the description)
817    fn is_empty(&self) -> bool {
818        self.repdefs
819            .iter()
820            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
821    }
822
823    /// Returns true if there is only a single layer of definition
824    pub fn is_simple_validity(&self) -> bool {
825        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
826    }
827
828    /// Return True if any layer has a validity bitmap
829    ///
830    /// Return False if all layers are non-null (the def levels can
831    /// be skipped in this case)
832    pub fn has_nulls(&self) -> bool {
833        self.repdefs.iter().any(|rd| {
834            matches!(
835                rd,
836                RawRepDef::Validity(ValidityDesc {
837                    validity: Some(_),
838                    ..
839                }) | RawRepDef::Fsl(FslDesc {
840                    validity: Some(_),
841                    ..
842                })
843            )
844        })
845    }
846
847    pub fn has_offsets(&self) -> bool {
848        self.repdefs
849            .iter()
850            .any(|rd| matches!(rd, RawRepDef::Offsets(OffsetDesc { .. })))
851    }
852
853    /// Registers a nullable validity bitmap
854    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
855        self.check_validity_len(validity.len());
856        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
857            num_values: validity.len(),
858            validity: Some(validity.into_inner()),
859        }));
860    }
861
862    /// Registers an all-valid validity layer
863    pub fn add_no_null(&mut self, len: usize) {
864        self.check_validity_len(len);
865        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
866            validity: None,
867            num_values: len,
868        }));
869    }
870
871    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
872        if let Some(len) = self.len {
873            assert_eq!(num_values, len);
874        }
875        self.len = Some(num_values * dimension);
876        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
877        self.repdefs.push(RawRepDef::Fsl(FslDesc {
878            num_values,
879            validity: validity.map(|v| v.into_inner()),
880            dimension,
881        }))
882    }
883
884    fn check_offset_len(&mut self, offsets: &[i64]) {
885        if let Some(len) = self.len {
886            assert!(offsets.len() == len + 1);
887        }
888        self.len = Some(offsets[offsets.len() - 1] as usize);
889    }
890
891    /// Adds a layer of offsets
892    ///
893    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
894    /// always represented by a zero-length (identical) pair of offsets and so the caller
895    /// should filter out any garbage items before encoding them.  To assist with this the
896    /// method will return true if any non-empty null lists were found.
897    pub fn add_offsets<O: OffsetSizeTrait>(
898        &mut self,
899        offsets: OffsetBuffer<O>,
900        validity: Option<NullBuffer>,
901    ) -> bool {
902        let mut has_garbage_values = false;
903        if O::IS_LARGE {
904            let inner = offsets.into_inner();
905            let len = inner.len();
906            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, len);
907            let mut normalized = Vec::with_capacity(len);
908            normalized.push(0_i64);
909            let mut specials = Vec::new();
910            let mut has_empty_lists = false;
911            let mut last_off = 0;
912            if let Some(validity) = validity.as_ref() {
913                for (idx, (off, valid)) in i64_buff.windows(2).zip(validity.iter()).enumerate() {
914                    let len: i64 = off[1] - off[0];
915                    match (valid, len == 0) {
916                        (false, is_empty) => {
917                            specials.push(SpecialOffset::NullList(idx));
918                            has_garbage_values |= !is_empty;
919                        }
920                        (true, true) => {
921                            has_empty_lists = true;
922                            specials.push(SpecialOffset::EmptyList(idx));
923                        }
924                        _ => {
925                            last_off += len;
926                        }
927                    }
928                    normalized.push(last_off);
929                }
930            } else {
931                for (idx, off) in i64_buff.windows(2).enumerate() {
932                    let len: i64 = off[1] - off[0];
933                    if len == 0 {
934                        has_empty_lists = true;
935                        specials.push(SpecialOffset::EmptyList(idx));
936                    }
937                    last_off += len;
938                    normalized.push(last_off);
939                }
940            };
941            self.check_offset_len(&normalized);
942            self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
943                num_values: normalized.len() - 1,
944                offsets: normalized.into(),
945                validity: validity.map(|v| v.into_inner()),
946                has_empty_lists,
947                specials: specials.into(),
948            }));
949            has_garbage_values
950        } else {
951            let inner = offsets.into_inner();
952            let len = inner.len();
953            let scalar_off = ScalarBuffer::<i32>::new(inner.into_inner(), 0, len);
954            let mut casted = Vec::with_capacity(len);
955            casted.push(0);
956            let mut has_empty_lists = false;
957            let mut specials = Vec::new();
958            let mut last_off: i64 = 0;
959            if let Some(validity) = validity.as_ref() {
960                for (idx, (off, valid)) in scalar_off.windows(2).zip(validity.iter()).enumerate() {
961                    let len = (off[1] - off[0]) as i64;
962                    match (valid, len == 0) {
963                        (false, is_empty) => {
964                            specials.push(SpecialOffset::NullList(idx));
965                            has_garbage_values |= !is_empty;
966                        }
967                        (true, true) => {
968                            has_empty_lists = true;
969                            specials.push(SpecialOffset::EmptyList(idx));
970                        }
971                        _ => {
972                            last_off += len;
973                        }
974                    }
975                    casted.push(last_off);
976                }
977            } else {
978                for (idx, off) in scalar_off.windows(2).enumerate() {
979                    let len = (off[1] - off[0]) as i64;
980                    if len == 0 {
981                        has_empty_lists = true;
982                        specials.push(SpecialOffset::EmptyList(idx));
983                    }
984                    last_off += len;
985                    casted.push(last_off);
986                }
987            };
988            self.check_offset_len(&casted);
989            self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
990                num_values: casted.len() - 1,
991                offsets: casted.into(),
992                validity: validity.map(|v| v.into_inner()),
993                has_empty_lists,
994                specials: specials.into(),
995            }));
996            has_garbage_values
997        }
998    }
999
1000    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
1001    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
1002    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
1003    //
1004    // TODO: In the future, we may concatenate and serialize at the same time?
1005    //
1006    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
1007    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
1008    //
1009    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
1010    // all offsets) though the nullability / lengths may be different in each layer.
1011    fn concat_layers<'a>(
1012        layers: impl Iterator<Item = &'a RawRepDef>,
1013        num_layers: usize,
1014    ) -> RawRepDef {
1015        enum LayerKind {
1016            Validity,
1017            Fsl,
1018            Offsets,
1019        }
1020
1021        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
1022        // buffers.  The second pass actually adds the values.
1023        let mut collected = Vec::with_capacity(num_layers);
1024        let mut has_nulls = false;
1025        let mut layer_kind = LayerKind::Validity;
1026        let mut num_specials = 0;
1027        let mut all_dimension = 0;
1028        let mut all_has_empty_lists = false;
1029        let mut all_num_values = 0;
1030        for layer in layers {
1031            has_nulls |= layer.has_nulls();
1032            match layer {
1033                RawRepDef::Validity(_) => {
1034                    layer_kind = LayerKind::Validity;
1035                }
1036                RawRepDef::Offsets(OffsetDesc {
1037                    specials,
1038                    has_empty_lists,
1039                    ..
1040                }) => {
1041                    all_has_empty_lists |= *has_empty_lists;
1042                    layer_kind = LayerKind::Offsets;
1043                    num_specials += specials.len();
1044                }
1045                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
1046                    layer_kind = LayerKind::Fsl;
1047                    all_dimension = *dimension;
1048                }
1049            }
1050            collected.push(layer);
1051            all_num_values += layer.num_values();
1052        }
1053
1054        // Shortcut if there are no nulls
1055        if !has_nulls {
1056            match layer_kind {
1057                LayerKind::Validity => {
1058                    return RawRepDef::Validity(ValidityDesc {
1059                        validity: None,
1060                        num_values: all_num_values,
1061                    });
1062                }
1063                LayerKind::Fsl => {
1064                    return RawRepDef::Fsl(FslDesc {
1065                        validity: None,
1066                        num_values: all_num_values,
1067                        dimension: all_dimension,
1068                    })
1069                }
1070                LayerKind::Offsets => {}
1071            }
1072        }
1073
1074        // Only allocate if needed
1075        let mut validity_builder = if has_nulls {
1076            BooleanBufferBuilder::new(all_num_values)
1077        } else {
1078            BooleanBufferBuilder::new(0)
1079        };
1080        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1081            let mut all_offsets = Vec::with_capacity(all_num_values);
1082            all_offsets.push(0);
1083            all_offsets
1084        } else {
1085            Vec::new()
1086        };
1087        let mut all_specials = Vec::with_capacity(num_specials);
1088
1089        for layer in collected {
1090            match layer {
1091                RawRepDef::Validity(ValidityDesc {
1092                    validity: Some(validity),
1093                    ..
1094                }) => {
1095                    validity_builder.append_buffer(validity);
1096                }
1097                RawRepDef::Validity(ValidityDesc {
1098                    validity: None,
1099                    num_values,
1100                }) => {
1101                    validity_builder.append_n(*num_values, true);
1102                }
1103                RawRepDef::Fsl(FslDesc {
1104                    validity,
1105                    num_values,
1106                    ..
1107                }) => {
1108                    if let Some(validity) = validity {
1109                        validity_builder.append_buffer(validity);
1110                    } else {
1111                        validity_builder.append_n(*num_values, true);
1112                    }
1113                }
1114                RawRepDef::Offsets(OffsetDesc {
1115                    offsets,
1116                    validity: Some(validity),
1117                    has_empty_lists,
1118                    specials,
1119                    ..
1120                }) => {
1121                    all_has_empty_lists |= has_empty_lists;
1122                    validity_builder.append_buffer(validity);
1123                    let existing_lists = all_offsets.len() - 1;
1124                    let last = *all_offsets.last().unwrap();
1125                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1126                    all_specials.extend(specials.iter().map(|s| match s {
1127                        SpecialOffset::NullList(pos) => {
1128                            SpecialOffset::NullList(*pos + existing_lists)
1129                        }
1130                        SpecialOffset::EmptyList(pos) => {
1131                            SpecialOffset::EmptyList(*pos + existing_lists)
1132                        }
1133                    }));
1134                }
1135                RawRepDef::Offsets(OffsetDesc {
1136                    offsets,
1137                    validity: None,
1138                    has_empty_lists,
1139                    num_values,
1140                    specials,
1141                }) => {
1142                    all_has_empty_lists |= has_empty_lists;
1143                    if has_nulls {
1144                        validity_builder.append_n(*num_values, true);
1145                    }
1146                    let last = *all_offsets.last().unwrap();
1147                    let existing_lists = all_offsets.len() - 1;
1148                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1149                    all_specials.extend(specials.iter().map(|s| match s {
1150                        SpecialOffset::NullList(pos) => {
1151                            SpecialOffset::NullList(*pos + existing_lists)
1152                        }
1153                        SpecialOffset::EmptyList(pos) => {
1154                            SpecialOffset::EmptyList(*pos + existing_lists)
1155                        }
1156                    }));
1157                }
1158            }
1159        }
1160        let validity = if has_nulls {
1161            Some(validity_builder.finish())
1162        } else {
1163            None
1164        };
1165        match layer_kind {
1166            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1167                validity,
1168                num_values: all_num_values,
1169                dimension: all_dimension,
1170            }),
1171            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1172                validity,
1173                num_values: all_num_values,
1174            }),
1175            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1176                offsets: all_offsets.into(),
1177                validity,
1178                has_empty_lists: all_has_empty_lists,
1179                num_values: all_num_values,
1180                specials: all_specials.into(),
1181            }),
1182        }
1183    }
1184
1185    /// Converts the validity / offsets buffers that have been gathered so far
1186    /// into repetition and definition levels
1187    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1188        assert!(!builders.is_empty());
1189        if builders.iter().all(|b| b.is_empty()) {
1190            // No repetition, all-valid
1191            return SerializedRepDefs::empty(
1192                builders
1193                    .first()
1194                    .unwrap()
1195                    .repdefs
1196                    .iter()
1197                    .map(|_| DefinitionInterpretation::AllValidItem)
1198                    .collect::<Vec<_>>(),
1199            );
1200        }
1201        let has_nulls = builders.iter().any(|b| b.has_nulls());
1202        let has_offsets = builders.iter().any(|b| b.has_offsets());
1203        let total_len = builders.iter().map(|b| b.len.unwrap()).sum();
1204        let num_layers = builders[0].num_layers();
1205        let mut context = SerializerContext::new(total_len, has_nulls, has_offsets, num_layers);
1206        let combined_layers = (0..num_layers)
1207            .map(|layer_index| {
1208                Self::concat_layers(
1209                    builders.iter().map(|b| &b.repdefs[layer_index]),
1210                    builders.len(),
1211                )
1212            })
1213            .collect::<Vec<_>>();
1214        debug_assert!(builders
1215            .iter()
1216            .all(|b| b.num_layers() == builders[0].num_layers()));
1217        for layer in combined_layers.into_iter().rev() {
1218            match layer {
1219                RawRepDef::Validity(def) => {
1220                    context.record_validity(&def);
1221                }
1222                RawRepDef::Offsets(rep) => {
1223                    context.record_offsets(&rep);
1224                }
1225                RawRepDef::Fsl(fsl) => {
1226                    context.record_fsl(&fsl);
1227                }
1228            }
1229        }
1230        context.build().collapse_specials()
1231    }
1232}
1233
1234/// Starts with serialized repetition and definition levels and unravels
1235/// them into validity buffers and offsets buffers
1236///
1237/// This is used during decoding to create the necessary arrow structures
1238#[derive(Debug)]
1239pub struct RepDefUnraveler {
1240    rep_levels: Option<LevelBuffer>,
1241    def_levels: Option<LevelBuffer>,
1242    // Maps from definition level to the rep level at which that definition level is visible
1243    levels_to_rep: Vec<u16>,
1244    def_meaning: Arc<[DefinitionInterpretation]>,
1245    // Current definition level to compare to.
1246    current_def_cmp: u16,
1247    // Current rep level, determines which specials we can see
1248    current_rep_cmp: u16,
1249    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1250    // into special_defs
1251    current_layer: usize,
1252}
1253
1254impl RepDefUnraveler {
1255    /// Creates a new unraveler from serialized repetition and definition information
1256    pub fn new(
1257        rep_levels: Option<LevelBuffer>,
1258        def_levels: Option<LevelBuffer>,
1259        def_meaning: Arc<[DefinitionInterpretation]>,
1260    ) -> Self {
1261        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1262        let mut rep_counter = 0;
1263        // Level=0 is always visible and means valid item
1264        levels_to_rep.push(0);
1265        for meaning in def_meaning.as_ref() {
1266            match meaning {
1267                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1268                    // There is no corresponding level, so nothing to put in levels_to_rep
1269                }
1270                DefinitionInterpretation::NullableItem => {
1271                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1272                    levels_to_rep.push(rep_counter);
1273                }
1274                DefinitionInterpretation::NullableList => {
1275                    rep_counter += 1;
1276                    levels_to_rep.push(rep_counter);
1277                }
1278                DefinitionInterpretation::EmptyableList => {
1279                    rep_counter += 1;
1280                    levels_to_rep.push(rep_counter);
1281                }
1282                DefinitionInterpretation::NullableAndEmptyableList => {
1283                    rep_counter += 1;
1284                    levels_to_rep.push(rep_counter);
1285                    levels_to_rep.push(rep_counter);
1286                }
1287            }
1288        }
1289        Self {
1290            rep_levels,
1291            def_levels,
1292            current_def_cmp: 0,
1293            current_rep_cmp: 0,
1294            levels_to_rep,
1295            current_layer: 0,
1296            def_meaning,
1297        }
1298    }
1299
1300    pub fn is_all_valid(&self) -> bool {
1301        self.def_meaning[self.current_layer].is_all_valid()
1302    }
1303
1304    /// If the current level is a repetition layer then this returns the number of lists
1305    /// at this level.
1306    ///
1307    /// This is not valid to call when the current level is a struct/primitive layer because
1308    /// in some cases there may be no rep or def information to know this.
1309    pub fn max_lists(&self) -> usize {
1310        debug_assert!(
1311            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1312        );
1313        self.rep_levels
1314            .as_ref()
1315            // Worst case every rep item is max_rep and a new list
1316            .map(|levels| levels.len())
1317            .unwrap_or(0)
1318    }
1319
1320    /// Unravels a layer of offsets from the unraveler into the given offset width
1321    ///
1322    /// When decoding a list the caller should first unravel the offsets and then
1323    /// unravel the validity (this is the opposite order used during encoding)
1324    pub fn unravel_offsets<T: ArrowNativeType>(
1325        &mut self,
1326        offsets: &mut Vec<T>,
1327        validity: Option<&mut BooleanBufferBuilder>,
1328    ) -> Result<()> {
1329        let rep_levels = self
1330            .rep_levels
1331            .as_mut()
1332            .expect("Expected repetition level but data didn't contain repetition");
1333        let valid_level = self.current_def_cmp;
1334        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1335            DefinitionInterpretation::NullableList => {
1336                self.current_def_cmp += 1;
1337                (valid_level + 1, 0)
1338            }
1339            DefinitionInterpretation::EmptyableList => {
1340                self.current_def_cmp += 1;
1341                (0, valid_level + 1)
1342            }
1343            DefinitionInterpretation::NullableAndEmptyableList => {
1344                self.current_def_cmp += 2;
1345                (valid_level + 1, valid_level + 2)
1346            }
1347            DefinitionInterpretation::AllValidList => (0, 0),
1348            _ => unreachable!(),
1349        };
1350        let max_level = null_level.max(empty_level);
1351        self.current_layer += 1;
1352
1353        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1354
1355        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1356        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1357        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1358        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1359        //
1360        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1361        // length then we have N + unravelers.len() values instead of N + 1.
1362        offsets.pop();
1363
1364        let to_offset = |val: usize| {
1365            T::from_usize(val)
1366            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1367        };
1368        self.current_rep_cmp += 1;
1369        if let Some(def_levels) = &mut self.def_levels {
1370            assert!(rep_levels.len() == def_levels.len());
1371            // It's possible validity is None even if we have def levels.  For example, we might have
1372            // empty lists (which require def levels) but no nulls.
1373            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1374                Box::new(|is_valid| validity.append(is_valid))
1375            } else {
1376                Box::new(|_| {})
1377            };
1378            // This is a strange access pattern.  We are iterating over the rep/def levels and
1379            // at the same time writing the rep/def levels.  This means we need both a mutable
1380            // and immutable reference to the rep/def levels.
1381            let mut read_idx = 0;
1382            let mut write_idx = 0;
1383            while read_idx < rep_levels.len() {
1384                // SAFETY: We assert that rep_levels and def_levels have the same
1385                // len and read_idx and write_idx can never go past the end.
1386                unsafe {
1387                    let rep_val = *rep_levels.get_unchecked(read_idx);
1388                    if rep_val != 0 {
1389                        let def_val = *def_levels.get_unchecked(read_idx);
1390                        // Copy over
1391                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1392                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1393                        write_idx += 1;
1394
1395                        if def_val == 0 {
1396                            // This is a valid list
1397                            offsets.push(to_offset(curlen)?);
1398                            curlen += 1;
1399                            push_validity(true);
1400                        } else if def_val > max_level {
1401                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1402                        } else if def_val == null_level {
1403                            // This is a null list
1404                            offsets.push(to_offset(curlen)?);
1405                            push_validity(false);
1406                        } else if def_val == empty_level {
1407                            // This is an empty list
1408                            offsets.push(to_offset(curlen)?);
1409                            push_validity(true);
1410                        } else {
1411                            // New valid list starting with null item
1412                            offsets.push(to_offset(curlen)?);
1413                            curlen += 1;
1414                            push_validity(true);
1415                        }
1416                    } else {
1417                        curlen += 1;
1418                    }
1419                    read_idx += 1;
1420                }
1421            }
1422            offsets.push(to_offset(curlen)?);
1423            rep_levels.truncate(write_idx);
1424            def_levels.truncate(write_idx);
1425            Ok(())
1426        } else {
1427            // SAFETY: See above loop
1428            let mut read_idx = 0;
1429            let mut write_idx = 0;
1430            let old_offsets_len = offsets.len();
1431            while read_idx < rep_levels.len() {
1432                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1433                unsafe {
1434                    let rep_val = *rep_levels.get_unchecked(read_idx);
1435                    if rep_val != 0 {
1436                        // Finish the current list
1437                        offsets.push(to_offset(curlen)?);
1438                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1439                        write_idx += 1;
1440                    }
1441                    curlen += 1;
1442                    read_idx += 1;
1443                }
1444            }
1445            let num_new_lists = offsets.len() - old_offsets_len;
1446            offsets.push(to_offset(curlen)?);
1447            rep_levels.truncate(offsets.len() - 1);
1448            if let Some(validity) = validity {
1449                // Even though we don't have validity it is possible another unraveler did and so we need
1450                // to push all valids
1451                validity.append_n(num_new_lists, true);
1452            }
1453            Ok(())
1454        }
1455    }
1456
1457    pub fn skip_validity(&mut self) {
1458        debug_assert!(
1459            self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1460        );
1461        self.current_layer += 1;
1462    }
1463
1464    /// Unravels a layer of validity from the definition levels
1465    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1466        debug_assert!(
1467            self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1468        );
1469        self.current_layer += 1;
1470
1471        let def_levels = &self.def_levels.as_ref().unwrap();
1472
1473        let current_def_cmp = self.current_def_cmp;
1474        self.current_def_cmp += 1;
1475
1476        for is_valid in def_levels.iter().filter_map(|&level| {
1477            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1478                Some(level <= current_def_cmp)
1479            } else {
1480                None
1481            }
1482        }) {
1483            validity.append(is_valid);
1484        }
1485    }
1486
1487    pub fn decimate(&mut self, dimension: usize) {
1488        if self.rep_levels.is_some() {
1489            // If we need to support this then I think we need to walk through the rep def levels to find
1490            // the spots at which we keep.  E.g. if we have:
1491            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1492            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1493            //  dimension: 2
1494            //
1495            // The output should be:
1496            //  rep: 1 0 0 1 0 0 0
1497            //  def: 1 1 1 0 1 1 0
1498            //
1499            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1500            todo!("Not yet supported FSL<...List<...>>");
1501        }
1502        let Some(def_levels) = self.def_levels.as_mut() else {
1503            return;
1504        };
1505        let mut read_idx = 0;
1506        let mut write_idx = 0;
1507        while read_idx < def_levels.len() {
1508            unsafe {
1509                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1510            }
1511            write_idx += 1;
1512            read_idx += dimension;
1513        }
1514        def_levels.truncate(write_idx);
1515    }
1516}
1517
1518/// As we decode we may extract rep/def information from multiple pages (or multiple
1519/// chunks within a page).
1520///
1521/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1522/// interpretation (e.g. one page might contain null items but no null structs and the next
1523/// page might have null structs but no null items).
1524///
1525/// Concatenating these unravelers would be tricky and expensive so instead we have a
1526/// composite unraveler which unravels across multiple unravelers.
1527///
1528/// Note: this class should be used even if there is only one page / unraveler.  This is
1529/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1530/// class)
1531#[derive(Debug)]
1532pub struct CompositeRepDefUnraveler {
1533    unravelers: Vec<RepDefUnraveler>,
1534}
1535
1536impl CompositeRepDefUnraveler {
1537    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1538        Self { unravelers }
1539    }
1540
1541    /// Unravels a layer of validity
1542    ///
1543    /// Returns None if there are no null items in this layer
1544    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1545        let is_all_valid = self
1546            .unravelers
1547            .iter()
1548            .all(|unraveler| unraveler.is_all_valid());
1549
1550        if is_all_valid {
1551            for unraveler in self.unravelers.iter_mut() {
1552                unraveler.skip_validity();
1553            }
1554            None
1555        } else {
1556            let mut validity = BooleanBufferBuilder::new(num_values);
1557            for unraveler in self.unravelers.iter_mut() {
1558                unraveler.unravel_validity(&mut validity);
1559            }
1560            Some(NullBuffer::new(validity.finish()))
1561        }
1562    }
1563
1564    pub fn unravel_fsl_validity(
1565        &mut self,
1566        num_values: usize,
1567        dimension: usize,
1568    ) -> Option<NullBuffer> {
1569        for unraveler in self.unravelers.iter_mut() {
1570            unraveler.decimate(dimension);
1571        }
1572        self.unravel_validity(num_values)
1573    }
1574
1575    /// Unravels a layer of offsets (and the validity for that layer)
1576    pub fn unravel_offsets<T: ArrowNativeType>(
1577        &mut self,
1578    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1579        let mut is_all_valid = true;
1580        let mut max_num_lists = 0;
1581        for unraveler in self.unravelers.iter() {
1582            is_all_valid &= unraveler.is_all_valid();
1583            max_num_lists += unraveler.max_lists();
1584        }
1585
1586        let mut validity = if is_all_valid {
1587            None
1588        } else {
1589            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1590            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1591            Some(BooleanBufferBuilder::new(max_num_lists))
1592        };
1593
1594        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1595
1596        for unraveler in self.unravelers.iter_mut() {
1597            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1598        }
1599
1600        Ok((
1601            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1602            validity.map(|mut v| NullBuffer::new(v.finish())),
1603        ))
1604    }
1605}
1606
1607/// A [`ControlWordIterator`] when there are both repetition and definition levels
1608///
1609/// The iterator will put the repetition level in the upper bits and the definition
1610/// level in the lower bits.  The number of bits used for each level is determined
1611/// by the width of the repetition and definition levels.
1612#[derive(Debug)]
1613pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1614    repdef: I,
1615    def_width: usize,
1616    max_rep: u16,
1617    max_visible_def: u16,
1618    rep_mask: u16,
1619    def_mask: u16,
1620    bits_rep: u8,
1621    bits_def: u8,
1622    phantom: std::marker::PhantomData<W>,
1623}
1624
1625impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1626    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1627        let next = self.repdef.next()?;
1628        let control_word: u8 =
1629            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1630        buf.push(control_word);
1631        let is_new_row = next.0 == self.max_rep;
1632        let is_visible = next.1 <= self.max_visible_def;
1633        let is_valid_item = next.1 == 0;
1634        Some(ControlWordDesc {
1635            is_new_row,
1636            is_visible,
1637            is_valid_item,
1638        })
1639    }
1640}
1641
1642impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1643    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1644        let next = self.repdef.next()?;
1645        let control_word: u16 =
1646            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1647        let control_word = control_word.to_le_bytes();
1648        buf.push(control_word[0]);
1649        buf.push(control_word[1]);
1650        let is_new_row = next.0 == self.max_rep;
1651        let is_visible = next.1 <= self.max_visible_def;
1652        let is_valid_item = next.1 == 0;
1653        Some(ControlWordDesc {
1654            is_new_row,
1655            is_visible,
1656            is_valid_item,
1657        })
1658    }
1659}
1660
1661impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1662    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1663        let next = self.repdef.next()?;
1664        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1665            + ((next.1 & self.def_mask) as u32);
1666        let control_word = control_word.to_le_bytes();
1667        buf.push(control_word[0]);
1668        buf.push(control_word[1]);
1669        buf.push(control_word[2]);
1670        buf.push(control_word[3]);
1671        let is_new_row = next.0 == self.max_rep;
1672        let is_visible = next.1 <= self.max_visible_def;
1673        let is_valid_item = next.1 == 0;
1674        Some(ControlWordDesc {
1675            is_new_row,
1676            is_visible,
1677            is_valid_item,
1678        })
1679    }
1680}
1681
1682/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1683#[derive(Debug)]
1684pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1685    repdef: I,
1686    level_mask: u16,
1687    bits_rep: u8,
1688    bits_def: u8,
1689    max_rep: u16,
1690    phantom: std::marker::PhantomData<W>,
1691}
1692
1693impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1694    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1695        let next = self.repdef.next()?;
1696        buf.push((next & self.level_mask) as u8);
1697        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1698        let is_valid_item = next == 0 || self.bits_def == 0;
1699        Some(ControlWordDesc {
1700            is_new_row,
1701            // Either there is no rep, in which case there are no invisible items
1702            // or there is no def, in which case there are no invisible items
1703            is_visible: true,
1704            is_valid_item,
1705        })
1706    }
1707}
1708
1709impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1710    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1711        let next = self.repdef.next().unwrap() & self.level_mask;
1712        let control_word = next.to_le_bytes();
1713        buf.push(control_word[0]);
1714        buf.push(control_word[1]);
1715        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1716        let is_valid_item = next == 0 || self.bits_def == 0;
1717        Some(ControlWordDesc {
1718            is_new_row,
1719            is_visible: true,
1720            is_valid_item,
1721        })
1722    }
1723}
1724
1725impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1726    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1727        let next = self.repdef.next()?;
1728        let next = (next & self.level_mask) as u32;
1729        let control_word = next.to_le_bytes();
1730        buf.push(control_word[0]);
1731        buf.push(control_word[1]);
1732        buf.push(control_word[2]);
1733        buf.push(control_word[3]);
1734        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1735        let is_valid_item = next == 0 || self.bits_def == 0;
1736        Some(ControlWordDesc {
1737            is_new_row,
1738            is_visible: true,
1739            is_valid_item,
1740        })
1741    }
1742}
1743
1744/// A [`ControlWordIterator`] when there are no repetition or definition levels
1745#[derive(Debug)]
1746pub struct NilaryControlWordIterator {
1747    len: usize,
1748    idx: usize,
1749}
1750
1751impl NilaryControlWordIterator {
1752    fn append_next(&mut self) -> Option<ControlWordDesc> {
1753        if self.idx == self.len {
1754            None
1755        } else {
1756            self.idx += 1;
1757            Some(ControlWordDesc {
1758                is_new_row: true,
1759                is_visible: true,
1760                is_valid_item: true,
1761            })
1762        }
1763    }
1764}
1765
1766/// Helper function to get a bit mask of the given width
1767fn get_mask(width: u16) -> u16 {
1768    (1 << width) - 1
1769}
1770
1771// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1772// so it is in the critical path.
1773type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1774    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1775    T,
1776>;
1777
1778/// An iterator that generates control words from repetition and definition levels
1779///
1780/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1781/// the repetition and definition in it.
1782///
1783/// In the large majority of case we only need a single byte to represent both the
1784/// repetition and definition levels.  However, if there is deep nesting then we may
1785/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1786/// levels of nesting which seems unlikely to encounter in practice.
1787#[derive(Debug)]
1788pub enum ControlWordIterator<'a> {
1789    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1790    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1791    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1792    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1793    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1794    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1795    Nilary(NilaryControlWordIterator),
1796}
1797
1798/// Describes the properties of a control word
1799#[derive(Debug)]
1800pub struct ControlWordDesc {
1801    pub is_new_row: bool,
1802    pub is_visible: bool,
1803    pub is_valid_item: bool,
1804}
1805
1806impl ControlWordIterator<'_> {
1807    /// Appends the next control word to the buffer
1808    ///
1809    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1810    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1811        match self {
1812            Self::Binary8(iter) => iter.append_next(buf),
1813            Self::Binary16(iter) => iter.append_next(buf),
1814            Self::Binary32(iter) => iter.append_next(buf),
1815            Self::Unary8(iter) => iter.append_next(buf),
1816            Self::Unary16(iter) => iter.append_next(buf),
1817            Self::Unary32(iter) => iter.append_next(buf),
1818            Self::Nilary(iter) => iter.append_next(),
1819        }
1820    }
1821
1822    /// Return true if the control word iterator has repetition levels
1823    pub fn has_repetition(&self) -> bool {
1824        match self {
1825            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1826            Self::Unary8(iter) => iter.bits_rep > 0,
1827            Self::Unary16(iter) => iter.bits_rep > 0,
1828            Self::Unary32(iter) => iter.bits_rep > 0,
1829            Self::Nilary(_) => false,
1830        }
1831    }
1832
1833    /// Returns the number of bytes per control word
1834    pub fn bytes_per_word(&self) -> usize {
1835        match self {
1836            Self::Binary8(_) => 1,
1837            Self::Binary16(_) => 2,
1838            Self::Binary32(_) => 4,
1839            Self::Unary8(_) => 1,
1840            Self::Unary16(_) => 2,
1841            Self::Unary32(_) => 4,
1842            Self::Nilary(_) => 0,
1843        }
1844    }
1845
1846    /// Returns the number of bits used for the repetition level
1847    pub fn bits_rep(&self) -> u8 {
1848        match self {
1849            Self::Binary8(iter) => iter.bits_rep,
1850            Self::Binary16(iter) => iter.bits_rep,
1851            Self::Binary32(iter) => iter.bits_rep,
1852            Self::Unary8(iter) => iter.bits_rep,
1853            Self::Unary16(iter) => iter.bits_rep,
1854            Self::Unary32(iter) => iter.bits_rep,
1855            Self::Nilary(_) => 0,
1856        }
1857    }
1858
1859    /// Returns the number of bits used for the definition level
1860    pub fn bits_def(&self) -> u8 {
1861        match self {
1862            Self::Binary8(iter) => iter.bits_def,
1863            Self::Binary16(iter) => iter.bits_def,
1864            Self::Binary32(iter) => iter.bits_def,
1865            Self::Unary8(iter) => iter.bits_def,
1866            Self::Unary16(iter) => iter.bits_def,
1867            Self::Unary32(iter) => iter.bits_def,
1868            Self::Nilary(_) => 0,
1869        }
1870    }
1871}
1872
1873/// Builds a [`ControlWordIterator`] from repetition and definition levels
1874/// by first calculating the width needed and then creating the iterator
1875/// with the appropriate width
1876pub fn build_control_word_iterator<'a>(
1877    rep: Option<&'a [u16]>,
1878    max_rep: u16,
1879    def: Option<&'a [u16]>,
1880    max_def: u16,
1881    max_visible_def: u16,
1882    len: usize,
1883) -> ControlWordIterator<'a> {
1884    let rep_width = if max_rep == 0 {
1885        0
1886    } else {
1887        log_2_ceil(max_rep as u32) as u16
1888    };
1889    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1890    let def_width = if max_def == 0 {
1891        0
1892    } else {
1893        log_2_ceil(max_def as u32) as u16
1894    };
1895    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1896    let total_width = rep_width + def_width;
1897    match (rep, def) {
1898        (Some(rep), Some(def)) => {
1899            let iter = rep.iter().copied().zip(def.iter().copied());
1900            let def_width = def_width as usize;
1901            if total_width <= 8 {
1902                ControlWordIterator::Binary8(BinaryControlWordIterator {
1903                    repdef: iter,
1904                    rep_mask,
1905                    def_mask,
1906                    def_width,
1907                    max_rep,
1908                    max_visible_def,
1909                    bits_rep: rep_width as u8,
1910                    bits_def: def_width as u8,
1911                    phantom: std::marker::PhantomData,
1912                })
1913            } else if total_width <= 16 {
1914                ControlWordIterator::Binary16(BinaryControlWordIterator {
1915                    repdef: iter,
1916                    rep_mask,
1917                    def_mask,
1918                    def_width,
1919                    max_rep,
1920                    max_visible_def,
1921                    bits_rep: rep_width as u8,
1922                    bits_def: def_width as u8,
1923                    phantom: std::marker::PhantomData,
1924                })
1925            } else {
1926                ControlWordIterator::Binary32(BinaryControlWordIterator {
1927                    repdef: iter,
1928                    rep_mask,
1929                    def_mask,
1930                    def_width,
1931                    max_rep,
1932                    max_visible_def,
1933                    bits_rep: rep_width as u8,
1934                    bits_def: def_width as u8,
1935                    phantom: std::marker::PhantomData,
1936                })
1937            }
1938        }
1939        (Some(lev), None) => {
1940            let iter = lev.iter().copied();
1941            if total_width <= 8 {
1942                ControlWordIterator::Unary8(UnaryControlWordIterator {
1943                    repdef: iter,
1944                    level_mask: rep_mask,
1945                    bits_rep: total_width as u8,
1946                    bits_def: 0,
1947                    max_rep,
1948                    phantom: std::marker::PhantomData,
1949                })
1950            } else if total_width <= 16 {
1951                ControlWordIterator::Unary16(UnaryControlWordIterator {
1952                    repdef: iter,
1953                    level_mask: rep_mask,
1954                    bits_rep: total_width as u8,
1955                    bits_def: 0,
1956                    max_rep,
1957                    phantom: std::marker::PhantomData,
1958                })
1959            } else {
1960                ControlWordIterator::Unary32(UnaryControlWordIterator {
1961                    repdef: iter,
1962                    level_mask: rep_mask,
1963                    bits_rep: total_width as u8,
1964                    bits_def: 0,
1965                    max_rep,
1966                    phantom: std::marker::PhantomData,
1967                })
1968            }
1969        }
1970        (None, Some(lev)) => {
1971            let iter = lev.iter().copied();
1972            if total_width <= 8 {
1973                ControlWordIterator::Unary8(UnaryControlWordIterator {
1974                    repdef: iter,
1975                    level_mask: def_mask,
1976                    bits_rep: 0,
1977                    bits_def: total_width as u8,
1978                    max_rep: 0,
1979                    phantom: std::marker::PhantomData,
1980                })
1981            } else if total_width <= 16 {
1982                ControlWordIterator::Unary16(UnaryControlWordIterator {
1983                    repdef: iter,
1984                    level_mask: def_mask,
1985                    bits_rep: 0,
1986                    bits_def: total_width as u8,
1987                    max_rep: 0,
1988                    phantom: std::marker::PhantomData,
1989                })
1990            } else {
1991                ControlWordIterator::Unary32(UnaryControlWordIterator {
1992                    repdef: iter,
1993                    level_mask: def_mask,
1994                    bits_rep: 0,
1995                    bits_def: total_width as u8,
1996                    max_rep: 0,
1997                    phantom: std::marker::PhantomData,
1998                })
1999            }
2000        }
2001        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
2002    }
2003}
2004
2005/// A parser to unwrap control words into repetition and definition levels
2006///
2007/// This is the inverse of the [`ControlWordIterator`].
2008#[derive(Copy, Clone, Debug)]
2009pub enum ControlWordParser {
2010    // First item is the bits to shift, second is the mask to apply (the mask can be
2011    // calculated from the bits to shift but we don't want to calculate it each time)
2012    BOTH8(u8, u32),
2013    BOTH16(u8, u32),
2014    BOTH32(u8, u32),
2015    REP8,
2016    REP16,
2017    REP32,
2018    DEF8,
2019    DEF16,
2020    DEF32,
2021    NIL,
2022}
2023
2024impl ControlWordParser {
2025    fn parse_both<const WORD_SIZE: u8>(
2026        src: &[u8],
2027        dst_rep: &mut Vec<u16>,
2028        dst_def: &mut Vec<u16>,
2029        bits_to_shift: u8,
2030        mask_to_apply: u32,
2031    ) {
2032        match WORD_SIZE {
2033            1 => {
2034                let word = src[0];
2035                let rep = word >> bits_to_shift;
2036                let def = word & (mask_to_apply as u8);
2037                dst_rep.push(rep as u16);
2038                dst_def.push(def as u16);
2039            }
2040            2 => {
2041                let word = u16::from_le_bytes([src[0], src[1]]);
2042                let rep = word >> bits_to_shift;
2043                let def = word & mask_to_apply as u16;
2044                dst_rep.push(rep);
2045                dst_def.push(def);
2046            }
2047            4 => {
2048                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2049                let rep = word >> bits_to_shift;
2050                let def = word & mask_to_apply;
2051                dst_rep.push(rep as u16);
2052                dst_def.push(def as u16);
2053            }
2054            _ => unreachable!(),
2055        }
2056    }
2057
2058    fn parse_desc_both<const WORD_SIZE: u8>(
2059        src: &[u8],
2060        bits_to_shift: u8,
2061        mask_to_apply: u32,
2062        max_rep: u16,
2063        max_visible_def: u16,
2064    ) -> ControlWordDesc {
2065        match WORD_SIZE {
2066            1 => {
2067                let word = src[0];
2068                let rep = word >> bits_to_shift;
2069                let def = word & (mask_to_apply as u8);
2070                let is_visible = def as u16 <= max_visible_def;
2071                let is_new_row = rep as u16 == max_rep;
2072                let is_valid_item = def == 0;
2073                ControlWordDesc {
2074                    is_visible,
2075                    is_new_row,
2076                    is_valid_item,
2077                }
2078            }
2079            2 => {
2080                let word = u16::from_le_bytes([src[0], src[1]]);
2081                let rep = word >> bits_to_shift;
2082                let def = word & mask_to_apply as u16;
2083                let is_visible = def <= max_visible_def;
2084                let is_new_row = rep == max_rep;
2085                let is_valid_item = def == 0;
2086                ControlWordDesc {
2087                    is_visible,
2088                    is_new_row,
2089                    is_valid_item,
2090                }
2091            }
2092            4 => {
2093                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2094                let rep = word >> bits_to_shift;
2095                let def = word & mask_to_apply;
2096                let is_visible = def as u16 <= max_visible_def;
2097                let is_new_row = rep as u16 == max_rep;
2098                let is_valid_item = def == 0;
2099                ControlWordDesc {
2100                    is_visible,
2101                    is_new_row,
2102                    is_valid_item,
2103                }
2104            }
2105            _ => unreachable!(),
2106        }
2107    }
2108
2109    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2110        match WORD_SIZE {
2111            1 => {
2112                let word = src[0];
2113                dst.push(word as u16);
2114            }
2115            2 => {
2116                let word = u16::from_le_bytes([src[0], src[1]]);
2117                dst.push(word);
2118            }
2119            4 => {
2120                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2121                dst.push(word as u16);
2122            }
2123            _ => unreachable!(),
2124        }
2125    }
2126
2127    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2128        match WORD_SIZE {
2129            1 => ControlWordDesc {
2130                is_new_row: src[0] as u16 == max_rep,
2131                is_visible: true,
2132                is_valid_item: true,
2133            },
2134            2 => ControlWordDesc {
2135                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2136                is_visible: true,
2137                is_valid_item: true,
2138            },
2139            4 => ControlWordDesc {
2140                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2141                is_visible: true,
2142                is_valid_item: true,
2143            },
2144            _ => unreachable!(),
2145        }
2146    }
2147
2148    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2149        match WORD_SIZE {
2150            1 => ControlWordDesc {
2151                is_new_row: true,
2152                is_visible: true,
2153                is_valid_item: src[0] == 0,
2154            },
2155            2 => ControlWordDesc {
2156                is_new_row: true,
2157                is_visible: true,
2158                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2159            },
2160            4 => ControlWordDesc {
2161                is_new_row: true,
2162                is_visible: true,
2163                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2164            },
2165            _ => unreachable!(),
2166        }
2167    }
2168
2169    /// Returns the number of bytes per control word
2170    pub fn bytes_per_word(&self) -> usize {
2171        match self {
2172            Self::BOTH8(..) => 1,
2173            Self::BOTH16(..) => 2,
2174            Self::BOTH32(..) => 4,
2175            Self::REP8 => 1,
2176            Self::REP16 => 2,
2177            Self::REP32 => 4,
2178            Self::DEF8 => 1,
2179            Self::DEF16 => 2,
2180            Self::DEF32 => 4,
2181            Self::NIL => 0,
2182        }
2183    }
2184
2185    /// Appends the next control word to the rep & def buffers
2186    ///
2187    /// `src` should be pointing at the first byte (little endian) of the control word
2188    ///
2189    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2190    /// They will not be appended to if not needed.
2191    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2192        match self {
2193            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2194                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2195            }
2196            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2197                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2198            }
2199            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2200                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2201            }
2202            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2203            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2204            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2205            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2206            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2207            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2208            Self::NIL => {}
2209        }
2210    }
2211
2212    /// Return true if the control words contain repetition information
2213    pub fn has_rep(&self) -> bool {
2214        match self {
2215            Self::BOTH8(..)
2216            | Self::BOTH16(..)
2217            | Self::BOTH32(..)
2218            | Self::REP8
2219            | Self::REP16
2220            | Self::REP32 => true,
2221            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2222        }
2223    }
2224
2225    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2226    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2227        match self {
2228            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2229                src,
2230                *bits_to_shift,
2231                *mask_to_apply,
2232                max_rep,
2233                max_visible_def,
2234            ),
2235            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2236                src,
2237                *bits_to_shift,
2238                *mask_to_apply,
2239                max_rep,
2240                max_visible_def,
2241            ),
2242            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2243                src,
2244                *bits_to_shift,
2245                *mask_to_apply,
2246                max_rep,
2247                max_visible_def,
2248            ),
2249            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2250            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2251            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2252            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2253            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2254            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2255            Self::NIL => ControlWordDesc {
2256                is_new_row: true,
2257                is_valid_item: true,
2258                is_visible: true,
2259            },
2260        }
2261    }
2262
2263    /// Creates a new parser from the number of bits used for the repetition and definition levels
2264    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2265        let total_bits = bits_rep + bits_def;
2266
2267        enum WordSize {
2268            One,
2269            Two,
2270            Four,
2271        }
2272
2273        let word_size = if total_bits <= 8 {
2274            WordSize::One
2275        } else if total_bits <= 16 {
2276            WordSize::Two
2277        } else {
2278            WordSize::Four
2279        };
2280
2281        match (bits_rep > 0, bits_def > 0, word_size) {
2282            (false, false, _) => Self::NIL,
2283            (false, true, WordSize::One) => Self::DEF8,
2284            (false, true, WordSize::Two) => Self::DEF16,
2285            (false, true, WordSize::Four) => Self::DEF32,
2286            (true, false, WordSize::One) => Self::REP8,
2287            (true, false, WordSize::Two) => Self::REP16,
2288            (true, false, WordSize::Four) => Self::REP32,
2289            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2290            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2291            (true, true, WordSize::Four) => {
2292                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2293            }
2294        }
2295    }
2296}
2297
2298#[cfg(test)]
2299mod tests {
2300    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2301
2302    use crate::repdef::{
2303        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2304    };
2305
2306    use super::RepDefBuilder;
2307
2308    fn validity(values: &[bool]) -> NullBuffer {
2309        NullBuffer::from_iter(values.iter().copied())
2310    }
2311
2312    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2313        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2314    }
2315
2316    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2317        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2318    }
2319
2320    #[test]
2321    fn test_repdef_basic() {
2322        // Basic case, rep & def
2323        let mut builder = RepDefBuilder::default();
2324        builder.add_offsets(
2325            offsets_64(&[0, 2, 2, 5]),
2326            Some(validity(&[true, false, true])),
2327        );
2328        builder.add_offsets(
2329            offsets_64(&[0, 1, 3, 5, 5, 9]),
2330            Some(validity(&[true, true, true, false, true])),
2331        );
2332        builder.add_validity_bitmap(validity(&[
2333            true, true, true, false, false, false, true, true, false,
2334        ]));
2335
2336        let repdefs = RepDefBuilder::serialize(vec![builder]);
2337        let rep = repdefs.repetition_levels.unwrap();
2338        let def = repdefs.definition_levels.unwrap();
2339
2340        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2341        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2342
2343        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2344            Some(rep.as_ref().to_vec()),
2345            Some(def.as_ref().to_vec()),
2346            repdefs.def_meaning.into(),
2347        )]);
2348
2349        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2350        // redundant validity values
2351        assert_eq!(
2352            unraveler.unravel_validity(9),
2353            Some(validity(&[
2354                true, true, true, false, false, false, true, true, false
2355            ]))
2356        );
2357        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2358        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2359        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2360        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2361        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2362        assert_eq!(val, Some(validity(&[true, false, true])));
2363    }
2364
2365    #[test]
2366    fn test_repdef_simple_null_empty_list() {
2367        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2368            let rep = repdefs.repetition_levels.unwrap();
2369            let def = repdefs.definition_levels.unwrap();
2370
2371            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2372            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2373            assert!(repdefs.special_records.is_empty());
2374            assert_eq!(
2375                vec![DefinitionInterpretation::NullableItem, last_def,],
2376                repdefs.def_meaning
2377            );
2378        };
2379
2380        // Null list and empty list should be serialized mostly the same
2381
2382        // Null case
2383        let mut builder = RepDefBuilder::default();
2384        builder.add_offsets(
2385            offsets_32(&[0, 2, 2, 5]),
2386            Some(validity(&[true, false, true])),
2387        );
2388        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2389
2390        let repdefs = RepDefBuilder::serialize(vec![builder]);
2391
2392        check(repdefs, DefinitionInterpretation::NullableList);
2393
2394        // Empty case
2395        let mut builder = RepDefBuilder::default();
2396        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2397        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2398
2399        let repdefs = RepDefBuilder::serialize(vec![builder]);
2400
2401        check(repdefs, DefinitionInterpretation::EmptyableList);
2402    }
2403
2404    #[test]
2405    fn test_repdef_empty_list_at_end() {
2406        // Regresses a failure we encountered when the last item was an empty list
2407        let mut builder = RepDefBuilder::default();
2408        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2409        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2410
2411        let repdefs = RepDefBuilder::serialize(vec![builder]);
2412
2413        let rep = repdefs.repetition_levels.unwrap();
2414        let def = repdefs.definition_levels.unwrap();
2415
2416        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2417        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2418        assert!(repdefs.special_records.is_empty());
2419        assert_eq!(
2420            vec![
2421                DefinitionInterpretation::NullableItem,
2422                DefinitionInterpretation::EmptyableList,
2423            ],
2424            repdefs.def_meaning
2425        );
2426    }
2427
2428    #[test]
2429    fn test_repdef_abnormal_nulls() {
2430        // List nulls are allowed to have non-empty offsets and garbage values
2431        // and the add_offsets call should normalize this
2432        let mut builder = RepDefBuilder::default();
2433        builder.add_offsets(
2434            offsets_32(&[0, 2, 5, 8]),
2435            Some(validity(&[true, false, true])),
2436        );
2437        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2438        // should be removed before continuing
2439        builder.add_no_null(5);
2440
2441        let repdefs = RepDefBuilder::serialize(vec![builder]);
2442
2443        let rep = repdefs.repetition_levels.unwrap();
2444        let def = repdefs.definition_levels.unwrap();
2445
2446        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2447        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2448
2449        assert_eq!(
2450            vec![
2451                DefinitionInterpretation::AllValidItem,
2452                DefinitionInterpretation::NullableList,
2453            ],
2454            repdefs.def_meaning
2455        );
2456    }
2457
2458    #[test]
2459    fn test_repdef_fsl() {
2460        let mut builder = RepDefBuilder::default();
2461        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2462        builder.add_fsl(None, 2, 4);
2463        builder.add_validity_bitmap(validity(&[
2464            true, false, true, false, true, false, true, false,
2465        ]));
2466
2467        let repdefs = RepDefBuilder::serialize(vec![builder]);
2468
2469        assert_eq!(
2470            vec![
2471                DefinitionInterpretation::NullableItem,
2472                DefinitionInterpretation::AllValidItem,
2473                DefinitionInterpretation::NullableItem
2474            ],
2475            repdefs.def_meaning
2476        );
2477
2478        assert!(repdefs.repetition_levels.is_none());
2479
2480        let def = repdefs.definition_levels.unwrap();
2481
2482        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2483
2484        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2485            None,
2486            Some(def.as_ref().to_vec()),
2487            repdefs.def_meaning.into(),
2488        )]);
2489
2490        assert_eq!(
2491            unraveler.unravel_validity(8),
2492            Some(validity(&[
2493                true, false, true, false, false, false, false, false
2494            ]))
2495        );
2496        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2497        assert_eq!(
2498            unraveler.unravel_fsl_validity(2, 2),
2499            Some(validity(&[true, false]))
2500        );
2501    }
2502
2503    #[test]
2504    fn test_repdef_fsl_allvalid_item() {
2505        let mut builder = RepDefBuilder::default();
2506        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2507        builder.add_fsl(None, 2, 4);
2508        builder.add_no_null(8);
2509
2510        let repdefs = RepDefBuilder::serialize(vec![builder]);
2511
2512        assert_eq!(
2513            vec![
2514                DefinitionInterpretation::AllValidItem,
2515                DefinitionInterpretation::AllValidItem,
2516                DefinitionInterpretation::NullableItem
2517            ],
2518            repdefs.def_meaning
2519        );
2520
2521        assert!(repdefs.repetition_levels.is_none());
2522
2523        let def = repdefs.definition_levels.unwrap();
2524
2525        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2526
2527        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2528            None,
2529            Some(def.as_ref().to_vec()),
2530            repdefs.def_meaning.into(),
2531        )]);
2532
2533        assert_eq!(unraveler.unravel_validity(8), None);
2534        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2535        assert_eq!(
2536            unraveler.unravel_fsl_validity(2, 2),
2537            Some(validity(&[true, false]))
2538        );
2539    }
2540
2541    #[test]
2542    fn test_repdef_sliced_offsets() {
2543        // Sliced lists may have offsets that don't start with zero.  The
2544        // add_offsets call needs to normalize these to operate correctly.
2545        let mut builder = RepDefBuilder::default();
2546        builder.add_offsets(
2547            offsets_32(&[5, 7, 7, 10]),
2548            Some(validity(&[true, false, true])),
2549        );
2550        builder.add_no_null(5);
2551
2552        let repdefs = RepDefBuilder::serialize(vec![builder]);
2553
2554        let rep = repdefs.repetition_levels.unwrap();
2555        let def = repdefs.definition_levels.unwrap();
2556
2557        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2558        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2559
2560        assert_eq!(
2561            vec![
2562                DefinitionInterpretation::AllValidItem,
2563                DefinitionInterpretation::NullableList,
2564            ],
2565            repdefs.def_meaning
2566        );
2567    }
2568
2569    #[test]
2570    fn test_repdef_complex_null_empty() {
2571        let mut builder = RepDefBuilder::default();
2572        builder.add_offsets(
2573            offsets_32(&[0, 4, 4, 4, 6]),
2574            Some(validity(&[true, false, true, true])),
2575        );
2576        builder.add_offsets(
2577            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2578            Some(validity(&[true, false, true, false, true, true])),
2579        );
2580        builder.add_no_null(3);
2581
2582        let repdefs = RepDefBuilder::serialize(vec![builder]);
2583
2584        let rep = repdefs.repetition_levels.unwrap();
2585        let def = repdefs.definition_levels.unwrap();
2586
2587        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2588        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2589    }
2590
2591    #[test]
2592    fn test_repdef_empty_list_no_null() {
2593        // Tests when we have some empty lists but no null lists.  This case
2594        // caused some bugs because we have definition but no nulls
2595        let mut builder = RepDefBuilder::default();
2596        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2597        builder.add_no_null(6);
2598
2599        let repdefs = RepDefBuilder::serialize(vec![builder]);
2600
2601        let rep = repdefs.repetition_levels.unwrap();
2602        let def = repdefs.definition_levels.unwrap();
2603
2604        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2605        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2606
2607        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2608            Some(rep.as_ref().to_vec()),
2609            Some(def.as_ref().to_vec()),
2610            repdefs.def_meaning.into(),
2611        )]);
2612
2613        assert_eq!(unraveler.unravel_validity(6), None);
2614        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2615        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2616        assert_eq!(val, None);
2617    }
2618
2619    #[test]
2620    fn test_repdef_all_valid() {
2621        let mut builder = RepDefBuilder::default();
2622        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2623        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2624        builder.add_no_null(9);
2625
2626        let repdefs = RepDefBuilder::serialize(vec![builder]);
2627        let rep = repdefs.repetition_levels.unwrap();
2628        assert!(repdefs.definition_levels.is_none());
2629
2630        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2631
2632        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2633            Some(rep.as_ref().to_vec()),
2634            None,
2635            repdefs.def_meaning.into(),
2636        )]);
2637
2638        assert_eq!(unraveler.unravel_validity(9), None);
2639        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2640        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2641        assert_eq!(val, None);
2642        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2643        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2644        assert_eq!(val, None);
2645    }
2646
2647    #[test]
2648    fn test_repdef_no_rep() {
2649        let mut builder = RepDefBuilder::default();
2650        builder.add_no_null(5);
2651        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2652        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2653
2654        let repdefs = RepDefBuilder::serialize(vec![builder]);
2655        assert!(repdefs.repetition_levels.is_none());
2656        let def = repdefs.definition_levels.unwrap();
2657
2658        assert_eq!([2, 2, 0, 0, 1], *def);
2659
2660        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2661            None,
2662            Some(def.as_ref().to_vec()),
2663            repdefs.def_meaning.into(),
2664        )]);
2665
2666        assert_eq!(
2667            unraveler.unravel_validity(5),
2668            Some(validity(&[false, false, true, true, false]))
2669        );
2670        assert_eq!(
2671            unraveler.unravel_validity(5),
2672            Some(validity(&[false, false, true, true, true]))
2673        );
2674        assert_eq!(unraveler.unravel_validity(5), None);
2675    }
2676
2677    #[test]
2678    fn test_composite_unravel() {
2679        let mut builder = RepDefBuilder::default();
2680        builder.add_offsets(
2681            offsets_64(&[0, 2, 2, 5]),
2682            Some(validity(&[true, false, true])),
2683        );
2684        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2685
2686        let mut builder = RepDefBuilder::default();
2687        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2688        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2689
2690        let unravel1 = RepDefUnraveler::new(
2691            repdef1.repetition_levels.map(|l| l.to_vec()),
2692            repdef1.definition_levels.map(|l| l.to_vec()),
2693            repdef1.def_meaning.into(),
2694        );
2695        let unravel2 = RepDefUnraveler::new(
2696            repdef2.repetition_levels.map(|l| l.to_vec()),
2697            repdef2.definition_levels.map(|l| l.to_vec()),
2698            repdef2.def_meaning.into(),
2699        );
2700
2701        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2702
2703        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2704        assert_eq!(
2705            off.inner(),
2706            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2707        );
2708        assert_eq!(
2709            val,
2710            Some(validity(&[true, false, true, true, true, true, true, true]))
2711        );
2712    }
2713
2714    #[test]
2715    fn test_repdef_multiple_builders() {
2716        // Basic case, rep & def
2717        let mut builder1 = RepDefBuilder::default();
2718        builder1.add_offsets(offsets_64(&[0, 2]), None);
2719        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2720        builder1.add_validity_bitmap(validity(&[true, true, true]));
2721
2722        let mut builder2 = RepDefBuilder::default();
2723        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2724        builder2.add_offsets(
2725            offsets_64(&[0, 2, 2, 6]),
2726            Some(validity(&[true, false, true])),
2727        );
2728        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2729
2730        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2731
2732        let rep = repdefs.repetition_levels.unwrap();
2733        let def = repdefs.definition_levels.unwrap();
2734
2735        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2736        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2737    }
2738
2739    #[test]
2740    fn test_slicer() {
2741        let mut builder = RepDefBuilder::default();
2742        builder.add_offsets(
2743            offsets_64(&[0, 2, 2, 30, 30]),
2744            Some(validity(&[true, false, true, true])),
2745        );
2746        builder.add_no_null(30);
2747
2748        let repdefs = RepDefBuilder::serialize(vec![builder]);
2749
2750        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2751
2752        // First 5 items include a null list so we get 6 levels (12 bytes)
2753        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2754        // Next 20 are all plain
2755        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2756        // Last 5 include an empty list so we get 6 levels (12 bytes)
2757        assert_eq!(rep_slicer.slice_rest().len(), 12);
2758
2759        let mut def_slicer = repdefs.rep_slicer().unwrap();
2760
2761        // First 5 items include a null list so we get 6 levels (12 bytes)
2762        assert_eq!(def_slicer.slice_next(5).len(), 12);
2763        // Next 20 are all plain
2764        assert_eq!(def_slicer.slice_next(20).len(), 40);
2765        // Last 5 include an empty list so we get 6 levels (12 bytes)
2766        assert_eq!(def_slicer.slice_rest().len(), 12);
2767    }
2768
2769    #[test]
2770    fn test_control_words() {
2771        // Convert to control words, verify expected, convert back, verify same as original
2772        fn check(
2773            rep: &[u16],
2774            def: &[u16],
2775            expected_values: Vec<u8>,
2776            expected_bytes_per_word: usize,
2777            expected_bits_rep: u8,
2778            expected_bits_def: u8,
2779        ) {
2780            let num_vals = rep.len().max(def.len());
2781            let max_rep = rep.iter().max().copied().unwrap_or(0);
2782            let max_def = def.iter().max().copied().unwrap_or(0);
2783
2784            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2785            let in_def = if def.is_empty() { None } else { Some(def) };
2786
2787            let mut iter = super::build_control_word_iterator(
2788                in_rep,
2789                max_rep,
2790                in_def,
2791                max_def,
2792                max_def + 1,
2793                expected_values.len(),
2794            );
2795            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2796            assert_eq!(iter.bits_rep(), expected_bits_rep);
2797            assert_eq!(iter.bits_def(), expected_bits_def);
2798            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2799
2800            for _ in 0..num_vals {
2801                iter.append_next(&mut cw_vec);
2802            }
2803            assert!(iter.append_next(&mut cw_vec).is_none());
2804
2805            assert_eq!(expected_values, cw_vec);
2806
2807            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2808
2809            let mut rep_out = Vec::with_capacity(num_vals);
2810            let mut def_out = Vec::with_capacity(num_vals);
2811
2812            if expected_bytes_per_word > 0 {
2813                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2814                    parser.parse(slice, &mut rep_out, &mut def_out);
2815                }
2816            }
2817
2818            assert_eq!(rep, rep_out.as_slice());
2819            assert_eq!(def, def_out.as_slice());
2820        }
2821
2822        // Each will need 4 bits and so we should get 1-byte control words
2823        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2824        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2825        let expected = vec![
2826            0b00000101, // 0, 5
2827            0b01110011, // 7, 3
2828            0b00110001, // 3, 1
2829            0b00100010, // 2, 2
2830            0b10011100, // 9, 12
2831            0b10001111, // 8, 15
2832            0b11000000, // 12, 0
2833            0b01010010, // 5, 2
2834        ];
2835        check(rep, def, expected, 1, 4, 4);
2836
2837        // Now we need 5 bits for def so we get 2-byte control words
2838        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2839        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2840        let expected = vec![
2841            0b00000101, 0b00000000, // 0, 5
2842            0b11100011, 0b00000000, // 7, 3
2843            0b01100001, 0b00000000, // 3, 1
2844            0b01000010, 0b00000000, // 2, 2
2845            0b00101100, 0b00000001, // 9, 12
2846            0b00010110, 0b00000001, // 8, 22
2847            0b10000000, 0b00000001, // 12, 0
2848            0b10100010, 0b00000000, // 5, 2
2849        ];
2850        check(rep, def, expected, 2, 4, 5);
2851
2852        // Just rep, 4 bits so 1 byte each
2853        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2854        let expected = vec![
2855            0b00000000, // 0
2856            0b00000111, // 7
2857            0b00000011, // 3
2858            0b00000010, // 2
2859            0b00001001, // 9
2860            0b00001000, // 8
2861            0b00001100, // 12
2862            0b00000101, // 5
2863        ];
2864        check(levels, &[], expected.clone(), 1, 4, 0);
2865
2866        // Just def
2867        check(&[], levels, expected, 1, 0, 4);
2868
2869        // No rep, no def, no bytes
2870        check(&[], &[], Vec::default(), 0, 0, 0);
2871    }
2872
2873    #[test]
2874    fn test_control_words_rep_index() {
2875        fn check(
2876            rep: &[u16],
2877            def: &[u16],
2878            expected_new_rows: Vec<bool>,
2879            expected_is_visible: Vec<bool>,
2880        ) {
2881            let num_vals = rep.len().max(def.len());
2882            let max_rep = rep.iter().max().copied().unwrap_or(0);
2883            let max_def = def.iter().max().copied().unwrap_or(0);
2884
2885            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2886            let in_def = if def.is_empty() { None } else { Some(def) };
2887
2888            let mut iter = super::build_control_word_iterator(
2889                in_rep,
2890                max_rep,
2891                in_def,
2892                max_def,
2893                /*max_visible_def=*/ 2,
2894                expected_new_rows.len(),
2895            );
2896
2897            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2898            let mut expected_new_rows = expected_new_rows.iter().copied();
2899            let mut expected_is_visible = expected_is_visible.iter().copied();
2900            for _ in 0..expected_new_rows.len() {
2901                let word_desc = iter.append_next(&mut cw_vec).unwrap();
2902                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2903                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2904            }
2905            assert!(iter.append_next(&mut cw_vec).is_none());
2906        }
2907
2908        // 2 means new list
2909        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2910        // These values don't matter for this test
2911        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2912
2913        // Rep & def
2914        check(
2915            rep,
2916            def,
2917            vec![
2918                true, false, false, true, true, false, false, false, false, true, false,
2919            ],
2920            vec![
2921                true, true, true, false, true, true, true, true, true, true, true,
2922            ],
2923        );
2924        // Rep only
2925        check(
2926            rep,
2927            &[],
2928            vec![
2929                true, false, false, true, true, false, false, false, false, true, false,
2930            ],
2931            vec![true; 11],
2932        );
2933        // No repetition
2934        check(
2935            &[],
2936            def,
2937            vec![
2938                true, true, true, true, true, true, true, true, true, true, true,
2939            ],
2940            vec![true; 11],
2941        );
2942        // No repetition, no definition
2943        check(
2944            &[],
2945            &[],
2946            vec![
2947                true, true, true, true, true, true, true, true, true, true, true,
2948            ],
2949            vec![true; 11],
2950        );
2951    }
2952}