1use std::{
84 iter::{Copied, Zip},
85 sync::Arc,
86};
87
88use arrow_array::OffsetSizeTrait;
89use arrow_buffer::{
90 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
91};
92use lance_core::{utils::bit::log_2_ceil, Error, Result};
93use snafu::location;
94
95use crate::buffer::LanceBuffer;
96
97pub type LevelBuffer = Vec<u16>;
100
101#[derive(Clone, Debug)]
104struct OffsetDesc {
105 offsets: Arc<[i64]>,
106 specials: Arc<[SpecialOffset]>,
107 validity: Option<BooleanBuffer>,
108 has_empty_lists: bool,
109 num_values: usize,
110}
111
112#[derive(Clone, Debug)]
115struct ValidityDesc {
116 validity: Option<BooleanBuffer>,
117 num_values: usize,
118}
119
120#[derive(Clone, Debug)]
124struct FslDesc {
125 validity: Option<BooleanBuffer>,
126 dimension: usize,
127 num_values: usize,
128}
129
130#[derive(Clone, Debug)]
134enum RawRepDef {
135 Offsets(OffsetDesc),
136 Validity(ValidityDesc),
137 Fsl(FslDesc),
138}
139
140impl RawRepDef {
141 fn has_nulls(&self) -> bool {
143 match self {
144 Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
145 Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
146 Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
147 }
148 }
149
150 fn num_values(&self) -> usize {
152 match self {
153 Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
154 Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
155 Self::Fsl(FslDesc { num_values, .. }) => *num_values,
156 }
157 }
158}
159
160#[derive(Debug)]
163pub struct SerializedRepDefs {
164 pub repetition_levels: Option<Arc<[u16]>>,
168 pub definition_levels: Option<Arc<[u16]>>,
172 pub special_records: Vec<SpecialRecord>,
177 pub def_meaning: Vec<DefinitionInterpretation>,
179 pub max_visible_level: Option<u16>,
186}
187
188impl SerializedRepDefs {
189 pub fn new(
190 repetition_levels: Option<LevelBuffer>,
191 definition_levels: Option<LevelBuffer>,
192 special_records: Vec<SpecialRecord>,
193 def_meaning: Vec<DefinitionInterpretation>,
194 ) -> Self {
195 let first_list = def_meaning.iter().position(|level| level.is_list());
196 let max_visible_level = first_list.map(|first_list| {
197 def_meaning
198 .iter()
199 .map(|level| level.num_def_levels())
200 .take(first_list)
201 .sum::<u16>()
202 });
203 Self {
204 repetition_levels: repetition_levels.map(Arc::from),
205 definition_levels: definition_levels.map(Arc::from),
206 special_records,
207 def_meaning,
208 max_visible_level,
209 }
210 }
211
212 pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
214 Self {
215 repetition_levels: None,
216 definition_levels: None,
217 special_records: Vec::new(),
218 def_meaning,
219 max_visible_level: None,
220 }
221 }
222
223 pub fn rep_slicer(&self) -> Option<RepDefSlicer> {
224 self.repetition_levels
225 .as_ref()
226 .map(|rep| RepDefSlicer::new(self, rep.clone()))
227 }
228
229 pub fn def_slicer(&self) -> Option<RepDefSlicer> {
230 self.definition_levels
231 .as_ref()
232 .map(|def| RepDefSlicer::new(self, def.clone()))
233 }
234
235 pub fn collapse_specials(self) -> Self {
238 if self.special_records.is_empty() {
239 return self;
240 }
241
242 let rep = self.repetition_levels.unwrap();
244
245 let new_len = rep.len() + self.special_records.len();
246
247 let mut new_rep = Vec::with_capacity(new_len);
248 let mut new_def = Vec::with_capacity(new_len);
249
250 if let Some(def) = self.definition_levels {
254 let mut def_itr = def.iter();
255 let mut rep_itr = rep.iter();
256 let mut special_itr = self.special_records.into_iter().peekable();
257 let mut last_special = None;
258
259 for idx in 0..new_len {
260 if let Some(special) = special_itr.peek() {
261 if special.pos == idx {
262 new_rep.push(special.rep_level);
263 new_def.push(special.def_level);
264 special_itr.next();
265 last_special = Some(new_rep.last_mut().unwrap());
266 } else {
267 let rep = if let Some(last_special) = last_special {
268 let rep = *last_special;
269 *last_special = *rep_itr.next().unwrap();
270 rep
271 } else {
272 *rep_itr.next().unwrap()
273 };
274 new_rep.push(rep);
275 new_def.push(*def_itr.next().unwrap());
276 last_special = None;
277 }
278 } else {
279 let rep = if let Some(last_special) = last_special {
280 let rep = *last_special;
281 *last_special = *rep_itr.next().unwrap();
282 rep
283 } else {
284 *rep_itr.next().unwrap()
285 };
286 new_rep.push(rep);
287 new_def.push(*def_itr.next().unwrap());
288 last_special = None;
289 }
290 }
291 } else {
292 let mut rep_itr = rep.iter();
293 let mut special_itr = self.special_records.into_iter().peekable();
294 let mut last_special = None;
295
296 for idx in 0..new_len {
297 if let Some(special) = special_itr.peek() {
298 if special.pos == idx {
299 new_rep.push(special.rep_level);
300 new_def.push(special.def_level);
301 special_itr.next();
302 last_special = Some(new_rep.last_mut().unwrap());
303 } else {
304 let rep = if let Some(last_special) = last_special {
305 let rep = *last_special;
306 *last_special = *rep_itr.next().unwrap();
307 rep
308 } else {
309 *rep_itr.next().unwrap()
310 };
311 new_rep.push(rep);
312 new_def.push(0);
313 last_special = None;
314 }
315 } else {
316 let rep = if let Some(last_special) = last_special {
317 let rep = *last_special;
318 *last_special = *rep_itr.next().unwrap();
319 rep
320 } else {
321 *rep_itr.next().unwrap()
322 };
323 new_rep.push(rep);
324 new_def.push(0);
325 last_special = None;
326 }
327 }
328 }
329
330 Self {
331 repetition_levels: Some(new_rep.into()),
332 definition_levels: Some(new_def.into()),
333 special_records: Vec::new(),
334 def_meaning: self.def_meaning,
335 max_visible_level: self.max_visible_level,
336 }
337 }
338}
339
340#[derive(Debug)]
348pub struct RepDefSlicer<'a> {
349 repdef: &'a SerializedRepDefs,
350 to_slice: LanceBuffer,
351 current: usize,
352}
353
354impl<'a> RepDefSlicer<'a> {
356 fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
357 Self {
358 repdef,
359 to_slice: LanceBuffer::reinterpret_slice(levels),
360 current: 0,
361 }
362 }
363
364 pub fn num_levels(&self) -> usize {
365 self.to_slice.len() / 2
366 }
367
368 pub fn num_levels_remaining(&self) -> usize {
369 self.num_levels() - self.current
370 }
371
372 pub fn all_levels(&self) -> &LanceBuffer {
373 &self.to_slice
374 }
375
376 pub fn slice_rest(&mut self) -> LanceBuffer {
385 let start = self.current;
386 let remaining = self.num_levels_remaining();
387 self.current = self.num_levels();
388 self.to_slice.slice_with_length(start * 2, remaining * 2)
389 }
390
391 pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
393 let start = self.current;
394 let Some(max_visible_level) = self.repdef.max_visible_level else {
395 self.current = start + num_values;
397 return self.to_slice.slice_with_length(start * 2, num_values * 2);
398 };
399 if let Some(def) = self.repdef.definition_levels.as_ref() {
400 let mut def_itr = def[start..].iter();
404 let mut num_taken = 0;
405 let mut num_passed = 0;
406 while num_taken < num_values {
407 let def_level = *def_itr.next().unwrap();
408 if def_level <= max_visible_level {
409 num_taken += 1;
410 }
411 num_passed += 1;
412 }
413 self.current = start + num_passed;
414 self.to_slice.slice_with_length(start * 2, num_passed * 2)
415 } else {
416 self.current = start + num_values;
418 self.to_slice.slice_with_length(start * 2, num_values * 2)
419 }
420 }
421}
422
423#[derive(Debug, Copy, Clone, PartialEq, Eq)]
424pub struct SpecialRecord {
425 pos: usize,
440 def_level: u16,
443 rep_level: u16,
446}
447
448#[derive(Debug, Copy, Clone, PartialEq, Eq)]
461pub enum DefinitionInterpretation {
462 AllValidItem,
463 AllValidList,
464 NullableItem,
465 NullableList,
466 EmptyableList,
467 NullableAndEmptyableList,
468}
469
470impl DefinitionInterpretation {
471 pub fn num_def_levels(&self) -> u16 {
473 match self {
474 Self::AllValidItem => 0,
475 Self::AllValidList => 0,
476 Self::NullableItem => 1,
477 Self::NullableList => 1,
478 Self::EmptyableList => 1,
479 Self::NullableAndEmptyableList => 2,
480 }
481 }
482
483 pub fn is_all_valid(&self) -> bool {
485 matches!(
486 self,
487 Self::AllValidItem | Self::AllValidList | Self::EmptyableList
488 )
489 }
490
491 pub fn is_list(&self) -> bool {
493 matches!(
494 self,
495 Self::AllValidList
496 | Self::NullableList
497 | Self::EmptyableList
498 | Self::NullableAndEmptyableList
499 )
500 }
501}
502
503struct SerializerContext {
536 last_offsets: Option<Vec<usize>>,
537 last_offsets_full: Option<Vec<usize>>,
538 specials: Vec<SpecialRecord>,
539 def_meaning: Vec<DefinitionInterpretation>,
540 rep_levels: LevelBuffer,
541 def_levels: LevelBuffer,
542 current_rep: u16,
543 current_def: u16,
544 current_multiplier: usize,
546 has_nulls: bool,
547}
548
549impl SerializerContext {
550 fn new(len: usize, has_nulls: bool, has_offsets: bool, num_layers: usize) -> Self {
551 let def_meaning = Vec::with_capacity(num_layers);
552 Self {
553 last_offsets: None,
554 last_offsets_full: None,
555 rep_levels: if has_offsets {
556 vec![0; len]
557 } else {
558 LevelBuffer::default()
559 },
560 def_levels: if has_nulls {
561 vec![0; len]
562 } else {
563 LevelBuffer::default()
564 },
565 def_meaning,
566 current_rep: 1,
567 current_def: 1,
568 current_multiplier: 1,
569 has_nulls: false,
570 specials: Vec::default(),
571 }
572 }
573
574 fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
575 let def = self.current_def;
576 self.current_def += meaning.num_def_levels();
577 self.def_meaning.push(meaning);
578 def
579 }
580
581 fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
582 if self.current_multiplier != 1 {
583 todo!("List<...FSL<...>> not yet supported");
587 }
588 let rep_level = self.current_rep;
589 let (null_list_level, empty_list_level) =
590 match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
591 (true, true) => {
592 let level =
593 self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
594 (level, level + 1)
595 }
596 (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
597 (false, true) => (
598 0,
599 self.checkout_def(DefinitionInterpretation::EmptyableList),
600 ),
601 (false, false) => {
602 self.checkout_def(DefinitionInterpretation::AllValidList);
603 (0, 0)
604 }
605 };
606 self.current_rep += 1;
607 if let Some(last_offsets) = &self.last_offsets {
608 let last_offsets_full = self.last_offsets_full.as_ref().unwrap();
609 let mut new_last_off = Vec::with_capacity(offset_desc.offsets.len());
610 let mut new_last_off_full = Vec::with_capacity(offset_desc.offsets.len());
611 let mut empties_seen = 0;
612 for off in offset_desc.offsets.windows(2) {
613 let offset_ctx = last_offsets[off[0] as usize];
614 new_last_off.push(offset_ctx);
615 new_last_off_full.push(last_offsets_full[off[0] as usize] + empties_seen);
616 if off[0] == off[1] {
617 empties_seen += 1;
618 } else {
619 self.rep_levels[offset_ctx] = rep_level;
620 }
621 }
622 self.last_offsets = Some(new_last_off);
623 self.last_offsets_full = Some(new_last_off_full);
624 } else {
625 let mut new_last_off = Vec::with_capacity(offset_desc.offsets.len());
626 let mut new_last_off_full = Vec::with_capacity(offset_desc.offsets.len());
627 let mut empties_seen = 0;
628 for off in offset_desc.offsets.windows(2) {
629 new_last_off.push(off[0] as usize);
630 new_last_off_full.push(off[0] as usize + empties_seen);
631 if off[0] == off[1] {
632 empties_seen += 1;
633 } else {
634 self.rep_levels[off[0] as usize] = rep_level;
635 }
636 }
637 self.last_offsets = Some(new_last_off);
638 self.last_offsets_full = Some(new_last_off_full);
639 }
640
641 let last_offsets_full = self.last_offsets_full.as_ref().unwrap();
643 let num_combined_specials = self.specials.len() + offset_desc.specials.len();
644 let mut new_specials = Vec::with_capacity(num_combined_specials);
645 let mut new_inserted = 0;
646 let mut old_specials_itr = self.specials.iter().peekable();
647 let mut specials_itr = offset_desc.specials.iter().peekable();
648 for _ in 0..num_combined_specials {
649 if let Some(old_special) = old_specials_itr.peek() {
650 let old_special_pos = old_special.pos + new_inserted;
651 if let Some(new_special) = specials_itr.peek() {
652 let new_special_pos = last_offsets_full[new_special.pos()];
653 if old_special_pos < new_special_pos {
654 let mut old_special = *old_specials_itr.next().unwrap();
655 old_special.pos = old_special_pos;
656 new_specials.push(old_special);
657 } else {
658 let new_special = specials_itr.next().unwrap();
659 new_specials.push(SpecialRecord {
660 pos: new_special_pos,
661 def_level: if matches!(new_special, SpecialOffset::EmptyList(_)) {
662 empty_list_level
663 } else {
664 null_list_level
665 },
666 rep_level,
667 });
668 new_inserted += 1;
669 }
670 } else {
671 let mut old_special = *old_specials_itr.next().unwrap();
672 old_special.pos = old_special_pos;
673 new_specials.push(old_special);
674 }
675 } else {
676 let new_special = specials_itr.next().unwrap();
677 new_specials.push(SpecialRecord {
678 pos: last_offsets_full[new_special.pos()],
679 def_level: if matches!(new_special, SpecialOffset::EmptyList(_)) {
680 empty_list_level
681 } else {
682 null_list_level
683 },
684 rep_level,
685 });
686 new_inserted += 1;
687 }
688 }
689 self.specials = new_specials;
690 }
691
692 fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
693 self.has_nulls = true;
694 assert!(!self.def_levels.is_empty());
695 if let Some(last_offsets) = &self.last_offsets {
696 last_offsets
697 .windows(2)
698 .zip(validity.iter())
699 .for_each(|(w, valid)| {
700 let start = w[0] * self.current_multiplier;
701 let end = w[1] * self.current_multiplier;
702 if !valid {
703 self.def_levels[start..end].fill(null_level);
704 }
705 });
706 } else if self.current_multiplier == 1 {
707 self.def_levels
708 .iter_mut()
709 .zip(validity.iter())
710 .for_each(|(def, valid)| {
711 if !valid {
712 *def = null_level;
713 }
714 });
715 } else {
716 self.def_levels
717 .iter_mut()
718 .zip(
719 validity
720 .iter()
721 .flat_map(|v| std::iter::repeat(v).take(self.current_multiplier)),
722 )
723 .for_each(|(def, valid)| {
724 if !valid {
725 *def = null_level;
726 }
727 });
728 }
729 }
730
731 fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
732 if let Some(validity) = validity {
733 let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
734 self.do_record_validity(validity, def_level);
735 } else {
736 self.checkout_def(DefinitionInterpretation::AllValidItem);
737 }
738 }
739
740 fn record_validity(&mut self, validity_desc: &ValidityDesc) {
741 self.record_validity_buf(&validity_desc.validity)
742 }
743
744 fn record_fsl(&mut self, fsl_desc: &FslDesc) {
745 self.current_multiplier *= fsl_desc.dimension;
746 self.record_validity_buf(&fsl_desc.validity);
747 }
748
749 fn build(self) -> SerializedRepDefs {
750 let definition_levels = if self.has_nulls {
751 Some(self.def_levels)
752 } else {
753 None
754 };
755 let repetition_levels = if self.current_rep > 1 {
756 Some(self.rep_levels)
757 } else {
758 None
759 };
760 SerializedRepDefs::new(
761 repetition_levels,
762 definition_levels,
763 self.specials,
764 self.def_meaning,
765 )
766 }
767}
768
769#[derive(Debug, Copy, Clone)]
772enum SpecialOffset {
773 NullList(usize),
774 EmptyList(usize),
775}
776
777impl SpecialOffset {
778 fn pos(&self) -> usize {
779 match self {
780 Self::NullList(pos) => *pos,
781 Self::EmptyList(pos) => *pos,
782 }
783 }
784}
785
786#[derive(Clone, Default, Debug)]
793pub struct RepDefBuilder {
794 repdefs: Vec<RawRepDef>,
796 len: Option<usize>,
801}
802
803impl RepDefBuilder {
804 fn check_validity_len(&mut self, incoming_len: usize) {
805 if let Some(len) = self.len {
806 assert_eq!(incoming_len, len);
807 }
808 self.len = Some(incoming_len);
809 }
810
811 fn num_layers(&self) -> usize {
812 self.repdefs.len()
813 }
814
815 fn is_empty(&self) -> bool {
818 self.repdefs
819 .iter()
820 .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
821 }
822
823 pub fn is_simple_validity(&self) -> bool {
825 self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
826 }
827
828 pub fn has_nulls(&self) -> bool {
833 self.repdefs.iter().any(|rd| {
834 matches!(
835 rd,
836 RawRepDef::Validity(ValidityDesc {
837 validity: Some(_),
838 ..
839 }) | RawRepDef::Fsl(FslDesc {
840 validity: Some(_),
841 ..
842 })
843 )
844 })
845 }
846
847 pub fn has_offsets(&self) -> bool {
848 self.repdefs
849 .iter()
850 .any(|rd| matches!(rd, RawRepDef::Offsets(OffsetDesc { .. })))
851 }
852
853 pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
855 self.check_validity_len(validity.len());
856 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
857 num_values: validity.len(),
858 validity: Some(validity.into_inner()),
859 }));
860 }
861
862 pub fn add_no_null(&mut self, len: usize) {
864 self.check_validity_len(len);
865 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
866 validity: None,
867 num_values: len,
868 }));
869 }
870
871 pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
872 if let Some(len) = self.len {
873 assert_eq!(num_values, len);
874 }
875 self.len = Some(num_values * dimension);
876 debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
877 self.repdefs.push(RawRepDef::Fsl(FslDesc {
878 num_values,
879 validity: validity.map(|v| v.into_inner()),
880 dimension,
881 }))
882 }
883
884 fn check_offset_len(&mut self, offsets: &[i64]) {
885 if let Some(len) = self.len {
886 assert!(offsets.len() == len + 1);
887 }
888 self.len = Some(offsets[offsets.len() - 1] as usize);
889 }
890
891 pub fn add_offsets<O: OffsetSizeTrait>(
898 &mut self,
899 offsets: OffsetBuffer<O>,
900 validity: Option<NullBuffer>,
901 ) -> bool {
902 let mut has_garbage_values = false;
903 if O::IS_LARGE {
904 let inner = offsets.into_inner();
905 let len = inner.len();
906 let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, len);
907 let mut normalized = Vec::with_capacity(len);
908 normalized.push(0_i64);
909 let mut specials = Vec::new();
910 let mut has_empty_lists = false;
911 let mut last_off = 0;
912 if let Some(validity) = validity.as_ref() {
913 for (idx, (off, valid)) in i64_buff.windows(2).zip(validity.iter()).enumerate() {
914 let len: i64 = off[1] - off[0];
915 match (valid, len == 0) {
916 (false, is_empty) => {
917 specials.push(SpecialOffset::NullList(idx));
918 has_garbage_values |= !is_empty;
919 }
920 (true, true) => {
921 has_empty_lists = true;
922 specials.push(SpecialOffset::EmptyList(idx));
923 }
924 _ => {
925 last_off += len;
926 }
927 }
928 normalized.push(last_off);
929 }
930 } else {
931 for (idx, off) in i64_buff.windows(2).enumerate() {
932 let len: i64 = off[1] - off[0];
933 if len == 0 {
934 has_empty_lists = true;
935 specials.push(SpecialOffset::EmptyList(idx));
936 }
937 last_off += len;
938 normalized.push(last_off);
939 }
940 };
941 self.check_offset_len(&normalized);
942 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
943 num_values: normalized.len() - 1,
944 offsets: normalized.into(),
945 validity: validity.map(|v| v.into_inner()),
946 has_empty_lists,
947 specials: specials.into(),
948 }));
949 has_garbage_values
950 } else {
951 let inner = offsets.into_inner();
952 let len = inner.len();
953 let scalar_off = ScalarBuffer::<i32>::new(inner.into_inner(), 0, len);
954 let mut casted = Vec::with_capacity(len);
955 casted.push(0);
956 let mut has_empty_lists = false;
957 let mut specials = Vec::new();
958 let mut last_off: i64 = 0;
959 if let Some(validity) = validity.as_ref() {
960 for (idx, (off, valid)) in scalar_off.windows(2).zip(validity.iter()).enumerate() {
961 let len = (off[1] - off[0]) as i64;
962 match (valid, len == 0) {
963 (false, is_empty) => {
964 specials.push(SpecialOffset::NullList(idx));
965 has_garbage_values |= !is_empty;
966 }
967 (true, true) => {
968 has_empty_lists = true;
969 specials.push(SpecialOffset::EmptyList(idx));
970 }
971 _ => {
972 last_off += len;
973 }
974 }
975 casted.push(last_off);
976 }
977 } else {
978 for (idx, off) in scalar_off.windows(2).enumerate() {
979 let len = (off[1] - off[0]) as i64;
980 if len == 0 {
981 has_empty_lists = true;
982 specials.push(SpecialOffset::EmptyList(idx));
983 }
984 last_off += len;
985 casted.push(last_off);
986 }
987 };
988 self.check_offset_len(&casted);
989 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
990 num_values: casted.len() - 1,
991 offsets: casted.into(),
992 validity: validity.map(|v| v.into_inner()),
993 has_empty_lists,
994 specials: specials.into(),
995 }));
996 has_garbage_values
997 }
998 }
999
1000 fn concat_layers<'a>(
1012 layers: impl Iterator<Item = &'a RawRepDef>,
1013 num_layers: usize,
1014 ) -> RawRepDef {
1015 enum LayerKind {
1016 Validity,
1017 Fsl,
1018 Offsets,
1019 }
1020
1021 let mut collected = Vec::with_capacity(num_layers);
1024 let mut has_nulls = false;
1025 let mut layer_kind = LayerKind::Validity;
1026 let mut num_specials = 0;
1027 let mut all_dimension = 0;
1028 let mut all_has_empty_lists = false;
1029 let mut all_num_values = 0;
1030 for layer in layers {
1031 has_nulls |= layer.has_nulls();
1032 match layer {
1033 RawRepDef::Validity(_) => {
1034 layer_kind = LayerKind::Validity;
1035 }
1036 RawRepDef::Offsets(OffsetDesc {
1037 specials,
1038 has_empty_lists,
1039 ..
1040 }) => {
1041 all_has_empty_lists |= *has_empty_lists;
1042 layer_kind = LayerKind::Offsets;
1043 num_specials += specials.len();
1044 }
1045 RawRepDef::Fsl(FslDesc { dimension, .. }) => {
1046 layer_kind = LayerKind::Fsl;
1047 all_dimension = *dimension;
1048 }
1049 }
1050 collected.push(layer);
1051 all_num_values += layer.num_values();
1052 }
1053
1054 if !has_nulls {
1056 match layer_kind {
1057 LayerKind::Validity => {
1058 return RawRepDef::Validity(ValidityDesc {
1059 validity: None,
1060 num_values: all_num_values,
1061 });
1062 }
1063 LayerKind::Fsl => {
1064 return RawRepDef::Fsl(FslDesc {
1065 validity: None,
1066 num_values: all_num_values,
1067 dimension: all_dimension,
1068 })
1069 }
1070 LayerKind::Offsets => {}
1071 }
1072 }
1073
1074 let mut validity_builder = if has_nulls {
1076 BooleanBufferBuilder::new(all_num_values)
1077 } else {
1078 BooleanBufferBuilder::new(0)
1079 };
1080 let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1081 let mut all_offsets = Vec::with_capacity(all_num_values);
1082 all_offsets.push(0);
1083 all_offsets
1084 } else {
1085 Vec::new()
1086 };
1087 let mut all_specials = Vec::with_capacity(num_specials);
1088
1089 for layer in collected {
1090 match layer {
1091 RawRepDef::Validity(ValidityDesc {
1092 validity: Some(validity),
1093 ..
1094 }) => {
1095 validity_builder.append_buffer(validity);
1096 }
1097 RawRepDef::Validity(ValidityDesc {
1098 validity: None,
1099 num_values,
1100 }) => {
1101 validity_builder.append_n(*num_values, true);
1102 }
1103 RawRepDef::Fsl(FslDesc {
1104 validity,
1105 num_values,
1106 ..
1107 }) => {
1108 if let Some(validity) = validity {
1109 validity_builder.append_buffer(validity);
1110 } else {
1111 validity_builder.append_n(*num_values, true);
1112 }
1113 }
1114 RawRepDef::Offsets(OffsetDesc {
1115 offsets,
1116 validity: Some(validity),
1117 has_empty_lists,
1118 specials,
1119 ..
1120 }) => {
1121 all_has_empty_lists |= has_empty_lists;
1122 validity_builder.append_buffer(validity);
1123 let existing_lists = all_offsets.len() - 1;
1124 let last = *all_offsets.last().unwrap();
1125 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1126 all_specials.extend(specials.iter().map(|s| match s {
1127 SpecialOffset::NullList(pos) => {
1128 SpecialOffset::NullList(*pos + existing_lists)
1129 }
1130 SpecialOffset::EmptyList(pos) => {
1131 SpecialOffset::EmptyList(*pos + existing_lists)
1132 }
1133 }));
1134 }
1135 RawRepDef::Offsets(OffsetDesc {
1136 offsets,
1137 validity: None,
1138 has_empty_lists,
1139 num_values,
1140 specials,
1141 }) => {
1142 all_has_empty_lists |= has_empty_lists;
1143 if has_nulls {
1144 validity_builder.append_n(*num_values, true);
1145 }
1146 let last = *all_offsets.last().unwrap();
1147 let existing_lists = all_offsets.len() - 1;
1148 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1149 all_specials.extend(specials.iter().map(|s| match s {
1150 SpecialOffset::NullList(pos) => {
1151 SpecialOffset::NullList(*pos + existing_lists)
1152 }
1153 SpecialOffset::EmptyList(pos) => {
1154 SpecialOffset::EmptyList(*pos + existing_lists)
1155 }
1156 }));
1157 }
1158 }
1159 }
1160 let validity = if has_nulls {
1161 Some(validity_builder.finish())
1162 } else {
1163 None
1164 };
1165 match layer_kind {
1166 LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1167 validity,
1168 num_values: all_num_values,
1169 dimension: all_dimension,
1170 }),
1171 LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1172 validity,
1173 num_values: all_num_values,
1174 }),
1175 LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1176 offsets: all_offsets.into(),
1177 validity,
1178 has_empty_lists: all_has_empty_lists,
1179 num_values: all_num_values,
1180 specials: all_specials.into(),
1181 }),
1182 }
1183 }
1184
1185 pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1188 assert!(!builders.is_empty());
1189 if builders.iter().all(|b| b.is_empty()) {
1190 return SerializedRepDefs::empty(
1192 builders
1193 .first()
1194 .unwrap()
1195 .repdefs
1196 .iter()
1197 .map(|_| DefinitionInterpretation::AllValidItem)
1198 .collect::<Vec<_>>(),
1199 );
1200 }
1201 let has_nulls = builders.iter().any(|b| b.has_nulls());
1202 let has_offsets = builders.iter().any(|b| b.has_offsets());
1203 let total_len = builders.iter().map(|b| b.len.unwrap()).sum();
1204 let num_layers = builders[0].num_layers();
1205 let mut context = SerializerContext::new(total_len, has_nulls, has_offsets, num_layers);
1206 let combined_layers = (0..num_layers)
1207 .map(|layer_index| {
1208 Self::concat_layers(
1209 builders.iter().map(|b| &b.repdefs[layer_index]),
1210 builders.len(),
1211 )
1212 })
1213 .collect::<Vec<_>>();
1214 debug_assert!(builders
1215 .iter()
1216 .all(|b| b.num_layers() == builders[0].num_layers()));
1217 for layer in combined_layers.into_iter().rev() {
1218 match layer {
1219 RawRepDef::Validity(def) => {
1220 context.record_validity(&def);
1221 }
1222 RawRepDef::Offsets(rep) => {
1223 context.record_offsets(&rep);
1224 }
1225 RawRepDef::Fsl(fsl) => {
1226 context.record_fsl(&fsl);
1227 }
1228 }
1229 }
1230 context.build().collapse_specials()
1231 }
1232}
1233
1234#[derive(Debug)]
1239pub struct RepDefUnraveler {
1240 rep_levels: Option<LevelBuffer>,
1241 def_levels: Option<LevelBuffer>,
1242 levels_to_rep: Vec<u16>,
1244 def_meaning: Arc<[DefinitionInterpretation]>,
1245 current_def_cmp: u16,
1247 current_rep_cmp: u16,
1249 current_layer: usize,
1252}
1253
1254impl RepDefUnraveler {
1255 pub fn new(
1257 rep_levels: Option<LevelBuffer>,
1258 def_levels: Option<LevelBuffer>,
1259 def_meaning: Arc<[DefinitionInterpretation]>,
1260 ) -> Self {
1261 let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1262 let mut rep_counter = 0;
1263 levels_to_rep.push(0);
1265 for meaning in def_meaning.as_ref() {
1266 match meaning {
1267 DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1268 }
1270 DefinitionInterpretation::NullableItem => {
1271 levels_to_rep.push(rep_counter);
1273 }
1274 DefinitionInterpretation::NullableList => {
1275 rep_counter += 1;
1276 levels_to_rep.push(rep_counter);
1277 }
1278 DefinitionInterpretation::EmptyableList => {
1279 rep_counter += 1;
1280 levels_to_rep.push(rep_counter);
1281 }
1282 DefinitionInterpretation::NullableAndEmptyableList => {
1283 rep_counter += 1;
1284 levels_to_rep.push(rep_counter);
1285 levels_to_rep.push(rep_counter);
1286 }
1287 }
1288 }
1289 Self {
1290 rep_levels,
1291 def_levels,
1292 current_def_cmp: 0,
1293 current_rep_cmp: 0,
1294 levels_to_rep,
1295 current_layer: 0,
1296 def_meaning,
1297 }
1298 }
1299
1300 pub fn is_all_valid(&self) -> bool {
1301 self.def_meaning[self.current_layer].is_all_valid()
1302 }
1303
1304 pub fn max_lists(&self) -> usize {
1310 debug_assert!(
1311 self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1312 );
1313 self.rep_levels
1314 .as_ref()
1315 .map(|levels| levels.len())
1317 .unwrap_or(0)
1318 }
1319
1320 pub fn unravel_offsets<T: ArrowNativeType>(
1325 &mut self,
1326 offsets: &mut Vec<T>,
1327 validity: Option<&mut BooleanBufferBuilder>,
1328 ) -> Result<()> {
1329 let rep_levels = self
1330 .rep_levels
1331 .as_mut()
1332 .expect("Expected repetition level but data didn't contain repetition");
1333 let valid_level = self.current_def_cmp;
1334 let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1335 DefinitionInterpretation::NullableList => {
1336 self.current_def_cmp += 1;
1337 (valid_level + 1, 0)
1338 }
1339 DefinitionInterpretation::EmptyableList => {
1340 self.current_def_cmp += 1;
1341 (0, valid_level + 1)
1342 }
1343 DefinitionInterpretation::NullableAndEmptyableList => {
1344 self.current_def_cmp += 2;
1345 (valid_level + 1, valid_level + 2)
1346 }
1347 DefinitionInterpretation::AllValidList => (0, 0),
1348 _ => unreachable!(),
1349 };
1350 let max_level = null_level.max(empty_level);
1351 self.current_layer += 1;
1352
1353 let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1354
1355 offsets.pop();
1363
1364 let to_offset = |val: usize| {
1365 T::from_usize(val)
1366 .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1367 };
1368 self.current_rep_cmp += 1;
1369 if let Some(def_levels) = &mut self.def_levels {
1370 assert!(rep_levels.len() == def_levels.len());
1371 let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1374 Box::new(|is_valid| validity.append(is_valid))
1375 } else {
1376 Box::new(|_| {})
1377 };
1378 let mut read_idx = 0;
1382 let mut write_idx = 0;
1383 while read_idx < rep_levels.len() {
1384 unsafe {
1387 let rep_val = *rep_levels.get_unchecked(read_idx);
1388 if rep_val != 0 {
1389 let def_val = *def_levels.get_unchecked(read_idx);
1390 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1392 *def_levels.get_unchecked_mut(write_idx) = def_val;
1393 write_idx += 1;
1394
1395 if def_val == 0 {
1396 offsets.push(to_offset(curlen)?);
1398 curlen += 1;
1399 push_validity(true);
1400 } else if def_val > max_level {
1401 } else if def_val == null_level {
1403 offsets.push(to_offset(curlen)?);
1405 push_validity(false);
1406 } else if def_val == empty_level {
1407 offsets.push(to_offset(curlen)?);
1409 push_validity(true);
1410 } else {
1411 offsets.push(to_offset(curlen)?);
1413 curlen += 1;
1414 push_validity(true);
1415 }
1416 } else {
1417 curlen += 1;
1418 }
1419 read_idx += 1;
1420 }
1421 }
1422 offsets.push(to_offset(curlen)?);
1423 rep_levels.truncate(write_idx);
1424 def_levels.truncate(write_idx);
1425 Ok(())
1426 } else {
1427 let mut read_idx = 0;
1429 let mut write_idx = 0;
1430 let old_offsets_len = offsets.len();
1431 while read_idx < rep_levels.len() {
1432 unsafe {
1434 let rep_val = *rep_levels.get_unchecked(read_idx);
1435 if rep_val != 0 {
1436 offsets.push(to_offset(curlen)?);
1438 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1439 write_idx += 1;
1440 }
1441 curlen += 1;
1442 read_idx += 1;
1443 }
1444 }
1445 let num_new_lists = offsets.len() - old_offsets_len;
1446 offsets.push(to_offset(curlen)?);
1447 rep_levels.truncate(offsets.len() - 1);
1448 if let Some(validity) = validity {
1449 validity.append_n(num_new_lists, true);
1452 }
1453 Ok(())
1454 }
1455 }
1456
1457 pub fn skip_validity(&mut self) {
1458 debug_assert!(
1459 self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1460 );
1461 self.current_layer += 1;
1462 }
1463
1464 pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1466 debug_assert!(
1467 self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1468 );
1469 self.current_layer += 1;
1470
1471 let def_levels = &self.def_levels.as_ref().unwrap();
1472
1473 let current_def_cmp = self.current_def_cmp;
1474 self.current_def_cmp += 1;
1475
1476 for is_valid in def_levels.iter().filter_map(|&level| {
1477 if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1478 Some(level <= current_def_cmp)
1479 } else {
1480 None
1481 }
1482 }) {
1483 validity.append(is_valid);
1484 }
1485 }
1486
1487 pub fn decimate(&mut self, dimension: usize) {
1488 if self.rep_levels.is_some() {
1489 todo!("Not yet supported FSL<...List<...>>");
1501 }
1502 let Some(def_levels) = self.def_levels.as_mut() else {
1503 return;
1504 };
1505 let mut read_idx = 0;
1506 let mut write_idx = 0;
1507 while read_idx < def_levels.len() {
1508 unsafe {
1509 *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1510 }
1511 write_idx += 1;
1512 read_idx += dimension;
1513 }
1514 def_levels.truncate(write_idx);
1515 }
1516}
1517
1518#[derive(Debug)]
1532pub struct CompositeRepDefUnraveler {
1533 unravelers: Vec<RepDefUnraveler>,
1534}
1535
1536impl CompositeRepDefUnraveler {
1537 pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1538 Self { unravelers }
1539 }
1540
1541 pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1545 let is_all_valid = self
1546 .unravelers
1547 .iter()
1548 .all(|unraveler| unraveler.is_all_valid());
1549
1550 if is_all_valid {
1551 for unraveler in self.unravelers.iter_mut() {
1552 unraveler.skip_validity();
1553 }
1554 None
1555 } else {
1556 let mut validity = BooleanBufferBuilder::new(num_values);
1557 for unraveler in self.unravelers.iter_mut() {
1558 unraveler.unravel_validity(&mut validity);
1559 }
1560 Some(NullBuffer::new(validity.finish()))
1561 }
1562 }
1563
1564 pub fn unravel_fsl_validity(
1565 &mut self,
1566 num_values: usize,
1567 dimension: usize,
1568 ) -> Option<NullBuffer> {
1569 for unraveler in self.unravelers.iter_mut() {
1570 unraveler.decimate(dimension);
1571 }
1572 self.unravel_validity(num_values)
1573 }
1574
1575 pub fn unravel_offsets<T: ArrowNativeType>(
1577 &mut self,
1578 ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1579 let mut is_all_valid = true;
1580 let mut max_num_lists = 0;
1581 for unraveler in self.unravelers.iter() {
1582 is_all_valid &= unraveler.is_all_valid();
1583 max_num_lists += unraveler.max_lists();
1584 }
1585
1586 let mut validity = if is_all_valid {
1587 None
1588 } else {
1589 Some(BooleanBufferBuilder::new(max_num_lists))
1592 };
1593
1594 let mut offsets = Vec::with_capacity(max_num_lists + 1);
1595
1596 for unraveler in self.unravelers.iter_mut() {
1597 unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1598 }
1599
1600 Ok((
1601 OffsetBuffer::new(ScalarBuffer::from(offsets)),
1602 validity.map(|mut v| NullBuffer::new(v.finish())),
1603 ))
1604 }
1605}
1606
1607#[derive(Debug)]
1613pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1614 repdef: I,
1615 def_width: usize,
1616 max_rep: u16,
1617 max_visible_def: u16,
1618 rep_mask: u16,
1619 def_mask: u16,
1620 bits_rep: u8,
1621 bits_def: u8,
1622 phantom: std::marker::PhantomData<W>,
1623}
1624
1625impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1626 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1627 let next = self.repdef.next()?;
1628 let control_word: u8 =
1629 (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1630 buf.push(control_word);
1631 let is_new_row = next.0 == self.max_rep;
1632 let is_visible = next.1 <= self.max_visible_def;
1633 let is_valid_item = next.1 == 0;
1634 Some(ControlWordDesc {
1635 is_new_row,
1636 is_visible,
1637 is_valid_item,
1638 })
1639 }
1640}
1641
1642impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1643 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1644 let next = self.repdef.next()?;
1645 let control_word: u16 =
1646 ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1647 let control_word = control_word.to_le_bytes();
1648 buf.push(control_word[0]);
1649 buf.push(control_word[1]);
1650 let is_new_row = next.0 == self.max_rep;
1651 let is_visible = next.1 <= self.max_visible_def;
1652 let is_valid_item = next.1 == 0;
1653 Some(ControlWordDesc {
1654 is_new_row,
1655 is_visible,
1656 is_valid_item,
1657 })
1658 }
1659}
1660
1661impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1662 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1663 let next = self.repdef.next()?;
1664 let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1665 + ((next.1 & self.def_mask) as u32);
1666 let control_word = control_word.to_le_bytes();
1667 buf.push(control_word[0]);
1668 buf.push(control_word[1]);
1669 buf.push(control_word[2]);
1670 buf.push(control_word[3]);
1671 let is_new_row = next.0 == self.max_rep;
1672 let is_visible = next.1 <= self.max_visible_def;
1673 let is_valid_item = next.1 == 0;
1674 Some(ControlWordDesc {
1675 is_new_row,
1676 is_visible,
1677 is_valid_item,
1678 })
1679 }
1680}
1681
1682#[derive(Debug)]
1684pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1685 repdef: I,
1686 level_mask: u16,
1687 bits_rep: u8,
1688 bits_def: u8,
1689 max_rep: u16,
1690 phantom: std::marker::PhantomData<W>,
1691}
1692
1693impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1694 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1695 let next = self.repdef.next()?;
1696 buf.push((next & self.level_mask) as u8);
1697 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1698 let is_valid_item = next == 0 || self.bits_def == 0;
1699 Some(ControlWordDesc {
1700 is_new_row,
1701 is_visible: true,
1704 is_valid_item,
1705 })
1706 }
1707}
1708
1709impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1710 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1711 let next = self.repdef.next().unwrap() & self.level_mask;
1712 let control_word = next.to_le_bytes();
1713 buf.push(control_word[0]);
1714 buf.push(control_word[1]);
1715 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1716 let is_valid_item = next == 0 || self.bits_def == 0;
1717 Some(ControlWordDesc {
1718 is_new_row,
1719 is_visible: true,
1720 is_valid_item,
1721 })
1722 }
1723}
1724
1725impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1726 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1727 let next = self.repdef.next()?;
1728 let next = (next & self.level_mask) as u32;
1729 let control_word = next.to_le_bytes();
1730 buf.push(control_word[0]);
1731 buf.push(control_word[1]);
1732 buf.push(control_word[2]);
1733 buf.push(control_word[3]);
1734 let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1735 let is_valid_item = next == 0 || self.bits_def == 0;
1736 Some(ControlWordDesc {
1737 is_new_row,
1738 is_visible: true,
1739 is_valid_item,
1740 })
1741 }
1742}
1743
1744#[derive(Debug)]
1746pub struct NilaryControlWordIterator {
1747 len: usize,
1748 idx: usize,
1749}
1750
1751impl NilaryControlWordIterator {
1752 fn append_next(&mut self) -> Option<ControlWordDesc> {
1753 if self.idx == self.len {
1754 None
1755 } else {
1756 self.idx += 1;
1757 Some(ControlWordDesc {
1758 is_new_row: true,
1759 is_visible: true,
1760 is_valid_item: true,
1761 })
1762 }
1763 }
1764}
1765
1766fn get_mask(width: u16) -> u16 {
1768 (1 << width) - 1
1769}
1770
1771type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1774 Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1775 T,
1776>;
1777
1778#[derive(Debug)]
1788pub enum ControlWordIterator<'a> {
1789 Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1790 Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1791 Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1792 Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1793 Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1794 Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1795 Nilary(NilaryControlWordIterator),
1796}
1797
1798#[derive(Debug)]
1800pub struct ControlWordDesc {
1801 pub is_new_row: bool,
1802 pub is_visible: bool,
1803 pub is_valid_item: bool,
1804}
1805
1806impl ControlWordIterator<'_> {
1807 pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1811 match self {
1812 Self::Binary8(iter) => iter.append_next(buf),
1813 Self::Binary16(iter) => iter.append_next(buf),
1814 Self::Binary32(iter) => iter.append_next(buf),
1815 Self::Unary8(iter) => iter.append_next(buf),
1816 Self::Unary16(iter) => iter.append_next(buf),
1817 Self::Unary32(iter) => iter.append_next(buf),
1818 Self::Nilary(iter) => iter.append_next(),
1819 }
1820 }
1821
1822 pub fn has_repetition(&self) -> bool {
1824 match self {
1825 Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1826 Self::Unary8(iter) => iter.bits_rep > 0,
1827 Self::Unary16(iter) => iter.bits_rep > 0,
1828 Self::Unary32(iter) => iter.bits_rep > 0,
1829 Self::Nilary(_) => false,
1830 }
1831 }
1832
1833 pub fn bytes_per_word(&self) -> usize {
1835 match self {
1836 Self::Binary8(_) => 1,
1837 Self::Binary16(_) => 2,
1838 Self::Binary32(_) => 4,
1839 Self::Unary8(_) => 1,
1840 Self::Unary16(_) => 2,
1841 Self::Unary32(_) => 4,
1842 Self::Nilary(_) => 0,
1843 }
1844 }
1845
1846 pub fn bits_rep(&self) -> u8 {
1848 match self {
1849 Self::Binary8(iter) => iter.bits_rep,
1850 Self::Binary16(iter) => iter.bits_rep,
1851 Self::Binary32(iter) => iter.bits_rep,
1852 Self::Unary8(iter) => iter.bits_rep,
1853 Self::Unary16(iter) => iter.bits_rep,
1854 Self::Unary32(iter) => iter.bits_rep,
1855 Self::Nilary(_) => 0,
1856 }
1857 }
1858
1859 pub fn bits_def(&self) -> u8 {
1861 match self {
1862 Self::Binary8(iter) => iter.bits_def,
1863 Self::Binary16(iter) => iter.bits_def,
1864 Self::Binary32(iter) => iter.bits_def,
1865 Self::Unary8(iter) => iter.bits_def,
1866 Self::Unary16(iter) => iter.bits_def,
1867 Self::Unary32(iter) => iter.bits_def,
1868 Self::Nilary(_) => 0,
1869 }
1870 }
1871}
1872
1873pub fn build_control_word_iterator<'a>(
1877 rep: Option<&'a [u16]>,
1878 max_rep: u16,
1879 def: Option<&'a [u16]>,
1880 max_def: u16,
1881 max_visible_def: u16,
1882 len: usize,
1883) -> ControlWordIterator<'a> {
1884 let rep_width = if max_rep == 0 {
1885 0
1886 } else {
1887 log_2_ceil(max_rep as u32) as u16
1888 };
1889 let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1890 let def_width = if max_def == 0 {
1891 0
1892 } else {
1893 log_2_ceil(max_def as u32) as u16
1894 };
1895 let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1896 let total_width = rep_width + def_width;
1897 match (rep, def) {
1898 (Some(rep), Some(def)) => {
1899 let iter = rep.iter().copied().zip(def.iter().copied());
1900 let def_width = def_width as usize;
1901 if total_width <= 8 {
1902 ControlWordIterator::Binary8(BinaryControlWordIterator {
1903 repdef: iter,
1904 rep_mask,
1905 def_mask,
1906 def_width,
1907 max_rep,
1908 max_visible_def,
1909 bits_rep: rep_width as u8,
1910 bits_def: def_width as u8,
1911 phantom: std::marker::PhantomData,
1912 })
1913 } else if total_width <= 16 {
1914 ControlWordIterator::Binary16(BinaryControlWordIterator {
1915 repdef: iter,
1916 rep_mask,
1917 def_mask,
1918 def_width,
1919 max_rep,
1920 max_visible_def,
1921 bits_rep: rep_width as u8,
1922 bits_def: def_width as u8,
1923 phantom: std::marker::PhantomData,
1924 })
1925 } else {
1926 ControlWordIterator::Binary32(BinaryControlWordIterator {
1927 repdef: iter,
1928 rep_mask,
1929 def_mask,
1930 def_width,
1931 max_rep,
1932 max_visible_def,
1933 bits_rep: rep_width as u8,
1934 bits_def: def_width as u8,
1935 phantom: std::marker::PhantomData,
1936 })
1937 }
1938 }
1939 (Some(lev), None) => {
1940 let iter = lev.iter().copied();
1941 if total_width <= 8 {
1942 ControlWordIterator::Unary8(UnaryControlWordIterator {
1943 repdef: iter,
1944 level_mask: rep_mask,
1945 bits_rep: total_width as u8,
1946 bits_def: 0,
1947 max_rep,
1948 phantom: std::marker::PhantomData,
1949 })
1950 } else if total_width <= 16 {
1951 ControlWordIterator::Unary16(UnaryControlWordIterator {
1952 repdef: iter,
1953 level_mask: rep_mask,
1954 bits_rep: total_width as u8,
1955 bits_def: 0,
1956 max_rep,
1957 phantom: std::marker::PhantomData,
1958 })
1959 } else {
1960 ControlWordIterator::Unary32(UnaryControlWordIterator {
1961 repdef: iter,
1962 level_mask: rep_mask,
1963 bits_rep: total_width as u8,
1964 bits_def: 0,
1965 max_rep,
1966 phantom: std::marker::PhantomData,
1967 })
1968 }
1969 }
1970 (None, Some(lev)) => {
1971 let iter = lev.iter().copied();
1972 if total_width <= 8 {
1973 ControlWordIterator::Unary8(UnaryControlWordIterator {
1974 repdef: iter,
1975 level_mask: def_mask,
1976 bits_rep: 0,
1977 bits_def: total_width as u8,
1978 max_rep: 0,
1979 phantom: std::marker::PhantomData,
1980 })
1981 } else if total_width <= 16 {
1982 ControlWordIterator::Unary16(UnaryControlWordIterator {
1983 repdef: iter,
1984 level_mask: def_mask,
1985 bits_rep: 0,
1986 bits_def: total_width as u8,
1987 max_rep: 0,
1988 phantom: std::marker::PhantomData,
1989 })
1990 } else {
1991 ControlWordIterator::Unary32(UnaryControlWordIterator {
1992 repdef: iter,
1993 level_mask: def_mask,
1994 bits_rep: 0,
1995 bits_def: total_width as u8,
1996 max_rep: 0,
1997 phantom: std::marker::PhantomData,
1998 })
1999 }
2000 }
2001 (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
2002 }
2003}
2004
2005#[derive(Copy, Clone, Debug)]
2009pub enum ControlWordParser {
2010 BOTH8(u8, u32),
2013 BOTH16(u8, u32),
2014 BOTH32(u8, u32),
2015 REP8,
2016 REP16,
2017 REP32,
2018 DEF8,
2019 DEF16,
2020 DEF32,
2021 NIL,
2022}
2023
2024impl ControlWordParser {
2025 fn parse_both<const WORD_SIZE: u8>(
2026 src: &[u8],
2027 dst_rep: &mut Vec<u16>,
2028 dst_def: &mut Vec<u16>,
2029 bits_to_shift: u8,
2030 mask_to_apply: u32,
2031 ) {
2032 match WORD_SIZE {
2033 1 => {
2034 let word = src[0];
2035 let rep = word >> bits_to_shift;
2036 let def = word & (mask_to_apply as u8);
2037 dst_rep.push(rep as u16);
2038 dst_def.push(def as u16);
2039 }
2040 2 => {
2041 let word = u16::from_le_bytes([src[0], src[1]]);
2042 let rep = word >> bits_to_shift;
2043 let def = word & mask_to_apply as u16;
2044 dst_rep.push(rep);
2045 dst_def.push(def);
2046 }
2047 4 => {
2048 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2049 let rep = word >> bits_to_shift;
2050 let def = word & mask_to_apply;
2051 dst_rep.push(rep as u16);
2052 dst_def.push(def as u16);
2053 }
2054 _ => unreachable!(),
2055 }
2056 }
2057
2058 fn parse_desc_both<const WORD_SIZE: u8>(
2059 src: &[u8],
2060 bits_to_shift: u8,
2061 mask_to_apply: u32,
2062 max_rep: u16,
2063 max_visible_def: u16,
2064 ) -> ControlWordDesc {
2065 match WORD_SIZE {
2066 1 => {
2067 let word = src[0];
2068 let rep = word >> bits_to_shift;
2069 let def = word & (mask_to_apply as u8);
2070 let is_visible = def as u16 <= max_visible_def;
2071 let is_new_row = rep as u16 == max_rep;
2072 let is_valid_item = def == 0;
2073 ControlWordDesc {
2074 is_visible,
2075 is_new_row,
2076 is_valid_item,
2077 }
2078 }
2079 2 => {
2080 let word = u16::from_le_bytes([src[0], src[1]]);
2081 let rep = word >> bits_to_shift;
2082 let def = word & mask_to_apply as u16;
2083 let is_visible = def <= max_visible_def;
2084 let is_new_row = rep == max_rep;
2085 let is_valid_item = def == 0;
2086 ControlWordDesc {
2087 is_visible,
2088 is_new_row,
2089 is_valid_item,
2090 }
2091 }
2092 4 => {
2093 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2094 let rep = word >> bits_to_shift;
2095 let def = word & mask_to_apply;
2096 let is_visible = def as u16 <= max_visible_def;
2097 let is_new_row = rep as u16 == max_rep;
2098 let is_valid_item = def == 0;
2099 ControlWordDesc {
2100 is_visible,
2101 is_new_row,
2102 is_valid_item,
2103 }
2104 }
2105 _ => unreachable!(),
2106 }
2107 }
2108
2109 fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2110 match WORD_SIZE {
2111 1 => {
2112 let word = src[0];
2113 dst.push(word as u16);
2114 }
2115 2 => {
2116 let word = u16::from_le_bytes([src[0], src[1]]);
2117 dst.push(word);
2118 }
2119 4 => {
2120 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2121 dst.push(word as u16);
2122 }
2123 _ => unreachable!(),
2124 }
2125 }
2126
2127 fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2128 match WORD_SIZE {
2129 1 => ControlWordDesc {
2130 is_new_row: src[0] as u16 == max_rep,
2131 is_visible: true,
2132 is_valid_item: true,
2133 },
2134 2 => ControlWordDesc {
2135 is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2136 is_visible: true,
2137 is_valid_item: true,
2138 },
2139 4 => ControlWordDesc {
2140 is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2141 is_visible: true,
2142 is_valid_item: true,
2143 },
2144 _ => unreachable!(),
2145 }
2146 }
2147
2148 fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2149 match WORD_SIZE {
2150 1 => ControlWordDesc {
2151 is_new_row: true,
2152 is_visible: true,
2153 is_valid_item: src[0] == 0,
2154 },
2155 2 => ControlWordDesc {
2156 is_new_row: true,
2157 is_visible: true,
2158 is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2159 },
2160 4 => ControlWordDesc {
2161 is_new_row: true,
2162 is_visible: true,
2163 is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2164 },
2165 _ => unreachable!(),
2166 }
2167 }
2168
2169 pub fn bytes_per_word(&self) -> usize {
2171 match self {
2172 Self::BOTH8(..) => 1,
2173 Self::BOTH16(..) => 2,
2174 Self::BOTH32(..) => 4,
2175 Self::REP8 => 1,
2176 Self::REP16 => 2,
2177 Self::REP32 => 4,
2178 Self::DEF8 => 1,
2179 Self::DEF16 => 2,
2180 Self::DEF32 => 4,
2181 Self::NIL => 0,
2182 }
2183 }
2184
2185 pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2192 match self {
2193 Self::BOTH8(bits_to_shift, mask_to_apply) => {
2194 Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2195 }
2196 Self::BOTH16(bits_to_shift, mask_to_apply) => {
2197 Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2198 }
2199 Self::BOTH32(bits_to_shift, mask_to_apply) => {
2200 Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2201 }
2202 Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2203 Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2204 Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2205 Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2206 Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2207 Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2208 Self::NIL => {}
2209 }
2210 }
2211
2212 pub fn has_rep(&self) -> bool {
2214 match self {
2215 Self::BOTH8(..)
2216 | Self::BOTH16(..)
2217 | Self::BOTH32(..)
2218 | Self::REP8
2219 | Self::REP16
2220 | Self::REP32 => true,
2221 Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2222 }
2223 }
2224
2225 pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2227 match self {
2228 Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2229 src,
2230 *bits_to_shift,
2231 *mask_to_apply,
2232 max_rep,
2233 max_visible_def,
2234 ),
2235 Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2236 src,
2237 *bits_to_shift,
2238 *mask_to_apply,
2239 max_rep,
2240 max_visible_def,
2241 ),
2242 Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2243 src,
2244 *bits_to_shift,
2245 *mask_to_apply,
2246 max_rep,
2247 max_visible_def,
2248 ),
2249 Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2250 Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2251 Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2252 Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2253 Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2254 Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2255 Self::NIL => ControlWordDesc {
2256 is_new_row: true,
2257 is_valid_item: true,
2258 is_visible: true,
2259 },
2260 }
2261 }
2262
2263 pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2265 let total_bits = bits_rep + bits_def;
2266
2267 enum WordSize {
2268 One,
2269 Two,
2270 Four,
2271 }
2272
2273 let word_size = if total_bits <= 8 {
2274 WordSize::One
2275 } else if total_bits <= 16 {
2276 WordSize::Two
2277 } else {
2278 WordSize::Four
2279 };
2280
2281 match (bits_rep > 0, bits_def > 0, word_size) {
2282 (false, false, _) => Self::NIL,
2283 (false, true, WordSize::One) => Self::DEF8,
2284 (false, true, WordSize::Two) => Self::DEF16,
2285 (false, true, WordSize::Four) => Self::DEF32,
2286 (true, false, WordSize::One) => Self::REP8,
2287 (true, false, WordSize::Two) => Self::REP16,
2288 (true, false, WordSize::Four) => Self::REP32,
2289 (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2290 (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2291 (true, true, WordSize::Four) => {
2292 Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2293 }
2294 }
2295 }
2296}
2297
2298#[cfg(test)]
2299mod tests {
2300 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2301
2302 use crate::repdef::{
2303 CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2304 };
2305
2306 use super::RepDefBuilder;
2307
2308 fn validity(values: &[bool]) -> NullBuffer {
2309 NullBuffer::from_iter(values.iter().copied())
2310 }
2311
2312 fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2313 OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2314 }
2315
2316 fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2317 OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2318 }
2319
2320 #[test]
2321 fn test_repdef_basic() {
2322 let mut builder = RepDefBuilder::default();
2324 builder.add_offsets(
2325 offsets_64(&[0, 2, 2, 5]),
2326 Some(validity(&[true, false, true])),
2327 );
2328 builder.add_offsets(
2329 offsets_64(&[0, 1, 3, 5, 5, 9]),
2330 Some(validity(&[true, true, true, false, true])),
2331 );
2332 builder.add_validity_bitmap(validity(&[
2333 true, true, true, false, false, false, true, true, false,
2334 ]));
2335
2336 let repdefs = RepDefBuilder::serialize(vec![builder]);
2337 let rep = repdefs.repetition_levels.unwrap();
2338 let def = repdefs.definition_levels.unwrap();
2339
2340 assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2341 assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2342
2343 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2344 Some(rep.as_ref().to_vec()),
2345 Some(def.as_ref().to_vec()),
2346 repdefs.def_meaning.into(),
2347 )]);
2348
2349 assert_eq!(
2352 unraveler.unravel_validity(9),
2353 Some(validity(&[
2354 true, true, true, false, false, false, true, true, false
2355 ]))
2356 );
2357 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2358 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2359 assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2360 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2361 assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2362 assert_eq!(val, Some(validity(&[true, false, true])));
2363 }
2364
2365 #[test]
2366 fn test_repdef_simple_null_empty_list() {
2367 let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2368 let rep = repdefs.repetition_levels.unwrap();
2369 let def = repdefs.definition_levels.unwrap();
2370
2371 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2372 assert_eq!([0, 0, 2, 0, 1, 0], *def);
2373 assert!(repdefs.special_records.is_empty());
2374 assert_eq!(
2375 vec![DefinitionInterpretation::NullableItem, last_def,],
2376 repdefs.def_meaning
2377 );
2378 };
2379
2380 let mut builder = RepDefBuilder::default();
2384 builder.add_offsets(
2385 offsets_32(&[0, 2, 2, 5]),
2386 Some(validity(&[true, false, true])),
2387 );
2388 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2389
2390 let repdefs = RepDefBuilder::serialize(vec![builder]);
2391
2392 check(repdefs, DefinitionInterpretation::NullableList);
2393
2394 let mut builder = RepDefBuilder::default();
2396 builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2397 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2398
2399 let repdefs = RepDefBuilder::serialize(vec![builder]);
2400
2401 check(repdefs, DefinitionInterpretation::EmptyableList);
2402 }
2403
2404 #[test]
2405 fn test_repdef_empty_list_at_end() {
2406 let mut builder = RepDefBuilder::default();
2408 builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2409 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2410
2411 let repdefs = RepDefBuilder::serialize(vec![builder]);
2412
2413 let rep = repdefs.repetition_levels.unwrap();
2414 let def = repdefs.definition_levels.unwrap();
2415
2416 assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2417 assert_eq!([0, 0, 0, 1, 0, 2], *def);
2418 assert!(repdefs.special_records.is_empty());
2419 assert_eq!(
2420 vec![
2421 DefinitionInterpretation::NullableItem,
2422 DefinitionInterpretation::EmptyableList,
2423 ],
2424 repdefs.def_meaning
2425 );
2426 }
2427
2428 #[test]
2429 fn test_repdef_abnormal_nulls() {
2430 let mut builder = RepDefBuilder::default();
2433 builder.add_offsets(
2434 offsets_32(&[0, 2, 5, 8]),
2435 Some(validity(&[true, false, true])),
2436 );
2437 builder.add_no_null(5);
2440
2441 let repdefs = RepDefBuilder::serialize(vec![builder]);
2442
2443 let rep = repdefs.repetition_levels.unwrap();
2444 let def = repdefs.definition_levels.unwrap();
2445
2446 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2447 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2448
2449 assert_eq!(
2450 vec![
2451 DefinitionInterpretation::AllValidItem,
2452 DefinitionInterpretation::NullableList,
2453 ],
2454 repdefs.def_meaning
2455 );
2456 }
2457
2458 #[test]
2459 fn test_repdef_fsl() {
2460 let mut builder = RepDefBuilder::default();
2461 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2462 builder.add_fsl(None, 2, 4);
2463 builder.add_validity_bitmap(validity(&[
2464 true, false, true, false, true, false, true, false,
2465 ]));
2466
2467 let repdefs = RepDefBuilder::serialize(vec![builder]);
2468
2469 assert_eq!(
2470 vec![
2471 DefinitionInterpretation::NullableItem,
2472 DefinitionInterpretation::AllValidItem,
2473 DefinitionInterpretation::NullableItem
2474 ],
2475 repdefs.def_meaning
2476 );
2477
2478 assert!(repdefs.repetition_levels.is_none());
2479
2480 let def = repdefs.definition_levels.unwrap();
2481
2482 assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2483
2484 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2485 None,
2486 Some(def.as_ref().to_vec()),
2487 repdefs.def_meaning.into(),
2488 )]);
2489
2490 assert_eq!(
2491 unraveler.unravel_validity(8),
2492 Some(validity(&[
2493 true, false, true, false, false, false, false, false
2494 ]))
2495 );
2496 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2497 assert_eq!(
2498 unraveler.unravel_fsl_validity(2, 2),
2499 Some(validity(&[true, false]))
2500 );
2501 }
2502
2503 #[test]
2504 fn test_repdef_fsl_allvalid_item() {
2505 let mut builder = RepDefBuilder::default();
2506 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2507 builder.add_fsl(None, 2, 4);
2508 builder.add_no_null(8);
2509
2510 let repdefs = RepDefBuilder::serialize(vec![builder]);
2511
2512 assert_eq!(
2513 vec![
2514 DefinitionInterpretation::AllValidItem,
2515 DefinitionInterpretation::AllValidItem,
2516 DefinitionInterpretation::NullableItem
2517 ],
2518 repdefs.def_meaning
2519 );
2520
2521 assert!(repdefs.repetition_levels.is_none());
2522
2523 let def = repdefs.definition_levels.unwrap();
2524
2525 assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2526
2527 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2528 None,
2529 Some(def.as_ref().to_vec()),
2530 repdefs.def_meaning.into(),
2531 )]);
2532
2533 assert_eq!(unraveler.unravel_validity(8), None);
2534 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2535 assert_eq!(
2536 unraveler.unravel_fsl_validity(2, 2),
2537 Some(validity(&[true, false]))
2538 );
2539 }
2540
2541 #[test]
2542 fn test_repdef_sliced_offsets() {
2543 let mut builder = RepDefBuilder::default();
2546 builder.add_offsets(
2547 offsets_32(&[5, 7, 7, 10]),
2548 Some(validity(&[true, false, true])),
2549 );
2550 builder.add_no_null(5);
2551
2552 let repdefs = RepDefBuilder::serialize(vec![builder]);
2553
2554 let rep = repdefs.repetition_levels.unwrap();
2555 let def = repdefs.definition_levels.unwrap();
2556
2557 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2558 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2559
2560 assert_eq!(
2561 vec![
2562 DefinitionInterpretation::AllValidItem,
2563 DefinitionInterpretation::NullableList,
2564 ],
2565 repdefs.def_meaning
2566 );
2567 }
2568
2569 #[test]
2570 fn test_repdef_complex_null_empty() {
2571 let mut builder = RepDefBuilder::default();
2572 builder.add_offsets(
2573 offsets_32(&[0, 4, 4, 4, 6]),
2574 Some(validity(&[true, false, true, true])),
2575 );
2576 builder.add_offsets(
2577 offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2578 Some(validity(&[true, false, true, false, true, true])),
2579 );
2580 builder.add_no_null(3);
2581
2582 let repdefs = RepDefBuilder::serialize(vec![builder]);
2583
2584 let rep = repdefs.repetition_levels.unwrap();
2585 let def = repdefs.definition_levels.unwrap();
2586
2587 assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2588 assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2589 }
2590
2591 #[test]
2592 fn test_repdef_empty_list_no_null() {
2593 let mut builder = RepDefBuilder::default();
2596 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2597 builder.add_no_null(6);
2598
2599 let repdefs = RepDefBuilder::serialize(vec![builder]);
2600
2601 let rep = repdefs.repetition_levels.unwrap();
2602 let def = repdefs.definition_levels.unwrap();
2603
2604 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2605 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2606
2607 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2608 Some(rep.as_ref().to_vec()),
2609 Some(def.as_ref().to_vec()),
2610 repdefs.def_meaning.into(),
2611 )]);
2612
2613 assert_eq!(unraveler.unravel_validity(6), None);
2614 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2615 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2616 assert_eq!(val, None);
2617 }
2618
2619 #[test]
2620 fn test_repdef_all_valid() {
2621 let mut builder = RepDefBuilder::default();
2622 builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2623 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2624 builder.add_no_null(9);
2625
2626 let repdefs = RepDefBuilder::serialize(vec![builder]);
2627 let rep = repdefs.repetition_levels.unwrap();
2628 assert!(repdefs.definition_levels.is_none());
2629
2630 assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2631
2632 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2633 Some(rep.as_ref().to_vec()),
2634 None,
2635 repdefs.def_meaning.into(),
2636 )]);
2637
2638 assert_eq!(unraveler.unravel_validity(9), None);
2639 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2640 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2641 assert_eq!(val, None);
2642 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2643 assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2644 assert_eq!(val, None);
2645 }
2646
2647 #[test]
2648 fn test_repdef_no_rep() {
2649 let mut builder = RepDefBuilder::default();
2650 builder.add_no_null(5);
2651 builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2652 builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2653
2654 let repdefs = RepDefBuilder::serialize(vec![builder]);
2655 assert!(repdefs.repetition_levels.is_none());
2656 let def = repdefs.definition_levels.unwrap();
2657
2658 assert_eq!([2, 2, 0, 0, 1], *def);
2659
2660 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2661 None,
2662 Some(def.as_ref().to_vec()),
2663 repdefs.def_meaning.into(),
2664 )]);
2665
2666 assert_eq!(
2667 unraveler.unravel_validity(5),
2668 Some(validity(&[false, false, true, true, false]))
2669 );
2670 assert_eq!(
2671 unraveler.unravel_validity(5),
2672 Some(validity(&[false, false, true, true, true]))
2673 );
2674 assert_eq!(unraveler.unravel_validity(5), None);
2675 }
2676
2677 #[test]
2678 fn test_composite_unravel() {
2679 let mut builder = RepDefBuilder::default();
2680 builder.add_offsets(
2681 offsets_64(&[0, 2, 2, 5]),
2682 Some(validity(&[true, false, true])),
2683 );
2684 let repdef1 = RepDefBuilder::serialize(vec![builder]);
2685
2686 let mut builder = RepDefBuilder::default();
2687 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2688 let repdef2 = RepDefBuilder::serialize(vec![builder]);
2689
2690 let unravel1 = RepDefUnraveler::new(
2691 repdef1.repetition_levels.map(|l| l.to_vec()),
2692 repdef1.definition_levels.map(|l| l.to_vec()),
2693 repdef1.def_meaning.into(),
2694 );
2695 let unravel2 = RepDefUnraveler::new(
2696 repdef2.repetition_levels.map(|l| l.to_vec()),
2697 repdef2.definition_levels.map(|l| l.to_vec()),
2698 repdef2.def_meaning.into(),
2699 );
2700
2701 let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2702
2703 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2704 assert_eq!(
2705 off.inner(),
2706 offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2707 );
2708 assert_eq!(
2709 val,
2710 Some(validity(&[true, false, true, true, true, true, true, true]))
2711 );
2712 }
2713
2714 #[test]
2715 fn test_repdef_multiple_builders() {
2716 let mut builder1 = RepDefBuilder::default();
2718 builder1.add_offsets(offsets_64(&[0, 2]), None);
2719 builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2720 builder1.add_validity_bitmap(validity(&[true, true, true]));
2721
2722 let mut builder2 = RepDefBuilder::default();
2723 builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2724 builder2.add_offsets(
2725 offsets_64(&[0, 2, 2, 6]),
2726 Some(validity(&[true, false, true])),
2727 );
2728 builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2729
2730 let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2731
2732 let rep = repdefs.repetition_levels.unwrap();
2733 let def = repdefs.definition_levels.unwrap();
2734
2735 assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2736 assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2737 }
2738
2739 #[test]
2740 fn test_slicer() {
2741 let mut builder = RepDefBuilder::default();
2742 builder.add_offsets(
2743 offsets_64(&[0, 2, 2, 30, 30]),
2744 Some(validity(&[true, false, true, true])),
2745 );
2746 builder.add_no_null(30);
2747
2748 let repdefs = RepDefBuilder::serialize(vec![builder]);
2749
2750 let mut rep_slicer = repdefs.rep_slicer().unwrap();
2751
2752 assert_eq!(rep_slicer.slice_next(5).len(), 12);
2754 assert_eq!(rep_slicer.slice_next(20).len(), 40);
2756 assert_eq!(rep_slicer.slice_rest().len(), 12);
2758
2759 let mut def_slicer = repdefs.rep_slicer().unwrap();
2760
2761 assert_eq!(def_slicer.slice_next(5).len(), 12);
2763 assert_eq!(def_slicer.slice_next(20).len(), 40);
2765 assert_eq!(def_slicer.slice_rest().len(), 12);
2767 }
2768
2769 #[test]
2770 fn test_control_words() {
2771 fn check(
2773 rep: &[u16],
2774 def: &[u16],
2775 expected_values: Vec<u8>,
2776 expected_bytes_per_word: usize,
2777 expected_bits_rep: u8,
2778 expected_bits_def: u8,
2779 ) {
2780 let num_vals = rep.len().max(def.len());
2781 let max_rep = rep.iter().max().copied().unwrap_or(0);
2782 let max_def = def.iter().max().copied().unwrap_or(0);
2783
2784 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2785 let in_def = if def.is_empty() { None } else { Some(def) };
2786
2787 let mut iter = super::build_control_word_iterator(
2788 in_rep,
2789 max_rep,
2790 in_def,
2791 max_def,
2792 max_def + 1,
2793 expected_values.len(),
2794 );
2795 assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2796 assert_eq!(iter.bits_rep(), expected_bits_rep);
2797 assert_eq!(iter.bits_def(), expected_bits_def);
2798 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2799
2800 for _ in 0..num_vals {
2801 iter.append_next(&mut cw_vec);
2802 }
2803 assert!(iter.append_next(&mut cw_vec).is_none());
2804
2805 assert_eq!(expected_values, cw_vec);
2806
2807 let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2808
2809 let mut rep_out = Vec::with_capacity(num_vals);
2810 let mut def_out = Vec::with_capacity(num_vals);
2811
2812 if expected_bytes_per_word > 0 {
2813 for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2814 parser.parse(slice, &mut rep_out, &mut def_out);
2815 }
2816 }
2817
2818 assert_eq!(rep, rep_out.as_slice());
2819 assert_eq!(def, def_out.as_slice());
2820 }
2821
2822 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2824 let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2825 let expected = vec![
2826 0b00000101, 0b01110011, 0b00110001, 0b00100010, 0b10011100, 0b10001111, 0b11000000, 0b01010010, ];
2835 check(rep, def, expected, 1, 4, 4);
2836
2837 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2839 let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2840 let expected = vec![
2841 0b00000101, 0b00000000, 0b11100011, 0b00000000, 0b01100001, 0b00000000, 0b01000010, 0b00000000, 0b00101100, 0b00000001, 0b00010110, 0b00000001, 0b10000000, 0b00000001, 0b10100010, 0b00000000, ];
2850 check(rep, def, expected, 2, 4, 5);
2851
2852 let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2854 let expected = vec![
2855 0b00000000, 0b00000111, 0b00000011, 0b00000010, 0b00001001, 0b00001000, 0b00001100, 0b00000101, ];
2864 check(levels, &[], expected.clone(), 1, 4, 0);
2865
2866 check(&[], levels, expected, 1, 0, 4);
2868
2869 check(&[], &[], Vec::default(), 0, 0, 0);
2871 }
2872
2873 #[test]
2874 fn test_control_words_rep_index() {
2875 fn check(
2876 rep: &[u16],
2877 def: &[u16],
2878 expected_new_rows: Vec<bool>,
2879 expected_is_visible: Vec<bool>,
2880 ) {
2881 let num_vals = rep.len().max(def.len());
2882 let max_rep = rep.iter().max().copied().unwrap_or(0);
2883 let max_def = def.iter().max().copied().unwrap_or(0);
2884
2885 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2886 let in_def = if def.is_empty() { None } else { Some(def) };
2887
2888 let mut iter = super::build_control_word_iterator(
2889 in_rep,
2890 max_rep,
2891 in_def,
2892 max_def,
2893 2,
2894 expected_new_rows.len(),
2895 );
2896
2897 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2898 let mut expected_new_rows = expected_new_rows.iter().copied();
2899 let mut expected_is_visible = expected_is_visible.iter().copied();
2900 for _ in 0..expected_new_rows.len() {
2901 let word_desc = iter.append_next(&mut cw_vec).unwrap();
2902 assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2903 assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2904 }
2905 assert!(iter.append_next(&mut cw_vec).is_none());
2906 }
2907
2908 let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2910 let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2912
2913 check(
2915 rep,
2916 def,
2917 vec![
2918 true, false, false, true, true, false, false, false, false, true, false,
2919 ],
2920 vec![
2921 true, true, true, false, true, true, true, true, true, true, true,
2922 ],
2923 );
2924 check(
2926 rep,
2927 &[],
2928 vec![
2929 true, false, false, true, true, false, false, false, false, true, false,
2930 ],
2931 vec![true; 11],
2932 );
2933 check(
2935 &[],
2936 def,
2937 vec![
2938 true, true, true, true, true, true, true, true, true, true, true,
2939 ],
2940 vec![true; 11],
2941 );
2942 check(
2944 &[],
2945 &[],
2946 vec![
2947 true, true, true, true, true, true, true, true, true, true, true,
2948 ],
2949 vec![true; 11],
2950 );
2951 }
2952}