1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 fmt::Debug,
8 iter,
9 ops::Range,
10 sync::Arc,
11 vec,
12};
13
14use arrow::array::AsArray;
15use arrow_array::{
16 make_array, types::UInt64Type, Array, ArrayRef, FixedSizeListArray, PrimitiveArray,
17};
18use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, ScalarBuffer};
19use arrow_schema::{DataType, Field as ArrowField};
20use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryStreamExt};
21use itertools::Itertools;
22use lance_arrow::deepcopy::deep_copy_array;
23use lance_core::{
24 cache::{Context, DeepSizeOf},
25 datatypes::{
26 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
27 },
28 error::Error,
29 utils::bit::pad_bytes,
30 utils::hash::U8SliceKey,
31};
32use log::{debug, trace};
33use snafu::location;
34
35use crate::repdef::{
36 build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser,
37 DefinitionInterpretation, RepDefSlicer,
38};
39use crate::statistics::{ComputeStat, GetStat, Stat};
40use crate::utils::bytepack::ByteUnpacker;
41use crate::{
42 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
43 utils::bytepack::BytepackedIntegerEncoder,
44};
45use crate::{
46 decoder::{FixedPerValueDecompressor, VariablePerValueDecompressor},
47 encoder::PerValueDataBlock,
48};
49use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
50
51use crate::{
52 buffer::LanceBuffer,
53 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
54 decoder::{
55 BlockDecompressor, ColumnInfo, DecodeArrayTask, DecodePageTask, DecodedArray, DecodedPage,
56 DecompressorStrategy, FieldScheduler, FilterExpression, LoadedPage, LogicalPageDecoder,
57 MessageType, MiniBlockDecompressor, NextDecodeTask, PageEncoding, PageInfo, PageScheduler,
58 PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
59 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
60 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
61 },
62 encoder::{
63 ArrayEncodingStrategy, CompressionStrategy, EncodeTask, EncodedColumn, EncodedPage,
64 EncodingOptions, FieldEncoder, MiniBlockChunk, MiniBlockCompressed, OutOfLineBuffers,
65 },
66 encodings::physical::{decoder_from_array_encoding, ColumnBuffers, PageBuffers},
67 format::{pb, ProtobufUtils},
68 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
69 EncodingsIo,
70};
71
72#[derive(Debug)]
73struct PrimitivePage {
74 scheduler: Box<dyn PageScheduler>,
75 num_rows: u64,
76 page_index: u32,
77}
78
79#[derive(Debug)]
89pub struct PrimitiveFieldScheduler {
90 data_type: DataType,
91 page_schedulers: Vec<PrimitivePage>,
92 num_rows: u64,
93 should_validate: bool,
94 column_index: u32,
95}
96
97impl PrimitiveFieldScheduler {
98 pub fn new(
99 column_index: u32,
100 data_type: DataType,
101 pages: Arc<[PageInfo]>,
102 buffers: ColumnBuffers,
103 should_validate: bool,
104 ) -> Self {
105 let page_schedulers = pages
106 .iter()
107 .enumerate()
108 .filter(|(page_index, page)| {
110 log::trace!("Skipping empty page with index {}", page_index);
111 page.num_rows > 0
112 })
113 .map(|(page_index, page)| {
114 let page_buffers = PageBuffers {
115 column_buffers: buffers,
116 positions_and_sizes: &page.buffer_offsets_and_sizes,
117 };
118 let scheduler = decoder_from_array_encoding(
119 page.encoding.as_legacy(),
120 &page_buffers,
121 &data_type,
122 );
123 PrimitivePage {
124 scheduler,
125 num_rows: page.num_rows,
126 page_index: page_index as u32,
127 }
128 })
129 .collect::<Vec<_>>();
130 let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
131 Self {
132 data_type,
133 page_schedulers,
134 num_rows,
135 should_validate,
136 column_index,
137 }
138 }
139}
140
141#[derive(Debug)]
142struct PrimitiveFieldSchedulingJob<'a> {
143 scheduler: &'a PrimitiveFieldScheduler,
144 ranges: Vec<Range<u64>>,
145 page_idx: usize,
146 range_idx: usize,
147 range_offset: u64,
148 global_row_offset: u64,
149}
150
151impl<'a> PrimitiveFieldSchedulingJob<'a> {
152 pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
153 Self {
154 scheduler,
155 ranges,
156 page_idx: 0,
157 range_idx: 0,
158 range_offset: 0,
159 global_row_offset: 0,
160 }
161 }
162}
163
164impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
165 fn schedule_next(
166 &mut self,
167 context: &mut SchedulerContext,
168 priority: &dyn PriorityRange,
169 ) -> Result<ScheduledScanLine> {
170 debug_assert!(self.range_idx < self.ranges.len());
171 let mut range = self.ranges[self.range_idx].clone();
173 range.start += self.range_offset;
174
175 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
176 trace!(
177 "Current range is {:?} and current page has {} rows",
178 range,
179 cur_page.num_rows
180 );
181 while cur_page.num_rows + self.global_row_offset <= range.start {
183 self.global_row_offset += cur_page.num_rows;
184 self.page_idx += 1;
185 trace!("Skipping entire page of {} rows", cur_page.num_rows);
186 cur_page = &self.scheduler.page_schedulers[self.page_idx];
187 }
188
189 let mut ranges_in_page = Vec::new();
193 while cur_page.num_rows + self.global_row_offset > range.start {
194 range.start = range.start.max(self.global_row_offset);
195 let start_in_page = range.start - self.global_row_offset;
196 let end_in_page = start_in_page + (range.end - range.start);
197 let end_in_page = end_in_page.min(cur_page.num_rows);
198 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
199
200 ranges_in_page.push(start_in_page..end_in_page);
201 if last_in_range {
202 self.range_idx += 1;
203 if self.range_idx == self.ranges.len() {
204 break;
205 }
206 range = self.ranges[self.range_idx].clone();
207 } else {
208 break;
209 }
210 }
211
212 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
213 trace!(
214 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
215 num_rows_in_next,
216 ranges_in_page.len(),
217 cur_page.num_rows,
218 priority.current_priority(),
219 self.scheduler.column_index,
220 cur_page.page_index,
221 );
222
223 self.global_row_offset += cur_page.num_rows;
224 self.page_idx += 1;
225
226 let physical_decoder = cur_page.scheduler.schedule_ranges(
227 &ranges_in_page,
228 context.io(),
229 priority.current_priority(),
230 );
231
232 let logical_decoder = PrimitiveFieldDecoder {
233 data_type: self.scheduler.data_type.clone(),
234 column_index: self.scheduler.column_index,
235 unloaded_physical_decoder: Some(physical_decoder),
236 physical_decoder: None,
237 rows_drained: 0,
238 num_rows: num_rows_in_next,
239 should_validate: self.scheduler.should_validate,
240 page_index: cur_page.page_index,
241 };
242
243 let decoder = Box::new(logical_decoder);
244 let decoder_ready = context.locate_decoder(decoder);
245 Ok(ScheduledScanLine {
246 decoders: vec![MessageType::DecoderReady(decoder_ready)],
247 rows_scheduled: num_rows_in_next,
248 })
249 }
250
251 fn num_rows(&self) -> u64 {
252 self.ranges.iter().map(|r| r.end - r.start).sum()
253 }
254}
255
256impl FieldScheduler for PrimitiveFieldScheduler {
257 fn num_rows(&self) -> u64 {
258 self.num_rows
259 }
260
261 fn schedule_ranges<'a>(
262 &'a self,
263 ranges: &[std::ops::Range<u64>],
264 _filter: &FilterExpression,
266 ) -> Result<Box<dyn SchedulingJob + 'a>> {
267 Ok(Box::new(PrimitiveFieldSchedulingJob::new(
268 self,
269 ranges.to_vec(),
270 )))
271 }
272
273 fn initialize<'a>(
274 &'a self,
275 _filter: &'a FilterExpression,
276 _context: &'a SchedulerContext,
277 ) -> BoxFuture<'a, Result<()>> {
278 std::future::ready(Ok(())).boxed()
280 }
281}
282
283trait StructuralPageScheduler: std::fmt::Debug + Send {
286 fn initialize<'a>(
288 &'a mut self,
289 io: &Arc<dyn EncodingsIo>,
290 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
291 fn load(&mut self, data: &Arc<dyn CachedPageData>);
293 fn schedule_ranges(
295 &self,
296 ranges: &[Range<u64>],
297 io: &Arc<dyn EncodingsIo>,
298 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
299}
300
301#[derive(Debug)]
303struct ChunkMeta {
304 num_values: u64,
305 chunk_size_bytes: u64,
306 offset_bytes: u64,
307}
308
309#[derive(Debug)]
311struct DecodedMiniBlockChunk {
312 rep: Option<ScalarBuffer<u16>>,
313 def: Option<ScalarBuffer<u16>>,
314 values: DataBlock,
315}
316
317#[derive(Debug)]
325struct DecodeMiniBlockTask {
326 rep_decompressor: Arc<dyn BlockDecompressor>,
328 def_decompressor: Arc<dyn BlockDecompressor>,
329 value_decompressor: Arc<dyn MiniBlockDecompressor>,
330 dictionary_data: Option<Arc<DataBlock>>,
331 def_meaning: Arc<[DefinitionInterpretation]>,
332 max_visible_level: u16,
333 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
334}
335
336impl DecodeMiniBlockTask {
337 fn decode_levels(
338 rep_decompressor: &dyn BlockDecompressor,
339 levels: LanceBuffer,
340 ) -> Result<Option<ScalarBuffer<u16>>> {
341 let rep = rep_decompressor.decompress(levels)?;
342 match rep {
343 DataBlock::FixedWidth(mut rep) => Ok(Some(rep.data.borrow_to_typed_slice::<u16>())),
344 DataBlock::Constant(constant) => {
345 assert_eq!(constant.data.len(), 2);
346 if constant.data[0] == 0 && constant.data[1] == 0 {
347 Ok(None)
348 } else {
349 todo!()
353 }
354 }
355 _ => unreachable!(),
356 }
357 }
358
359 fn extend_levels(
366 range: Range<u64>,
367 levels: &mut Option<LevelBuffer>,
368 level_buf: &Option<impl AsRef<[u16]>>,
369 dest_offset: usize,
370 ) {
371 if let Some(level_buf) = level_buf {
372 if levels.is_none() {
373 let mut new_levels_vec =
376 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
377 new_levels_vec.extend(iter::repeat(0).take(dest_offset));
378 *levels = Some(new_levels_vec);
379 }
380 levels.as_mut().unwrap().extend(
381 level_buf.as_ref()[range.start as usize..range.end as usize]
382 .iter()
383 .copied(),
384 );
385 } else if let Some(levels) = levels {
386 let num_values = (range.end - range.start) as usize;
387 levels.extend(iter::repeat(0).take(num_values));
390 }
391 }
392
393 fn map_range(
430 range: Range<u64>,
431 rep: Option<&impl AsRef<[u16]>>,
432 def: Option<&impl AsRef<[u16]>>,
433 max_rep: u16,
434 max_visible_def: u16,
435 total_items: u64,
438 preamble_action: PreambleAction,
439 ) -> (Range<u64>, Range<u64>) {
440 if let Some(rep) = rep {
441 let mut rep = rep.as_ref();
442 let mut items_in_preamble = 0;
445 let first_row_start = match preamble_action {
446 PreambleAction::Skip | PreambleAction::Take => {
447 let first_row_start = if let Some(def) = def.as_ref() {
448 let mut first_row_start = None;
449 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
450 if *rep == max_rep {
451 first_row_start = Some(idx);
452 break;
453 }
454 if *def <= max_visible_def {
455 items_in_preamble += 1;
456 }
457 }
458 first_row_start
459 } else {
460 let first_row_start = rep.iter().position(|&r| r == max_rep);
461 items_in_preamble = first_row_start.unwrap_or(rep.len());
462 first_row_start
463 };
464 if first_row_start.is_none() {
467 assert!(preamble_action == PreambleAction::Take);
468 return (0..total_items, 0..rep.len() as u64);
469 }
470 let first_row_start = first_row_start.unwrap() as u64;
471 rep = &rep[first_row_start as usize..];
472 first_row_start
473 }
474 PreambleAction::Absent => {
475 debug_assert!(rep[0] == max_rep);
476 0
477 }
478 };
479
480 if range.start == range.end {
482 debug_assert!(preamble_action == PreambleAction::Take);
483 return (0..items_in_preamble as u64, 0..first_row_start);
484 }
485 assert!(range.start < range.end);
486
487 let mut rows_seen = 0;
488 let mut new_start = 0;
489 let mut new_levels_start = 0;
490
491 if let Some(def) = def {
492 let def = &def.as_ref()[first_row_start as usize..];
493
494 let mut lead_invis_seen = 0;
496
497 if range.start > 0 {
498 if def[0] > max_visible_def {
499 lead_invis_seen += 1;
500 }
501 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
502 if *rep == max_rep {
503 rows_seen += 1;
504 if rows_seen == range.start {
505 new_start = idx as u64 + 1 - lead_invis_seen;
506 new_levels_start = idx as u64 + 1;
507 break;
508 }
509 if *def > max_visible_def {
510 lead_invis_seen += 1;
511 }
512 }
513 }
514 }
515
516 rows_seen += 1;
517
518 let mut new_end = u64::MAX;
519 let mut new_levels_end = rep.len() as u64;
520 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
521 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
522 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
523 .iter()
524 .zip(&def[(new_levels_start + 1) as usize..])
525 .enumerate()
526 {
527 if *rep == max_rep {
528 rows_seen += 1;
529 if rows_seen == range.end + 1 {
530 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
531 new_levels_end = idx as u64 + new_levels_start + 1;
532 break;
533 }
534 if *def > max_visible_def {
535 tail_invis_seen += 1;
536 }
537 }
538 }
539
540 if new_end == u64::MAX {
541 new_levels_end = rep.len() as u64;
542 let total_invis_seen = lead_invis_seen + tail_invis_seen;
544 new_end = rep.len() as u64 - total_invis_seen;
545 }
546
547 assert_ne!(new_end, u64::MAX);
548
549 if preamble_action == PreambleAction::Skip {
551 new_start += first_row_start;
554 new_end += first_row_start;
555 new_levels_start += first_row_start;
556 new_levels_end += first_row_start;
557 } else if preamble_action == PreambleAction::Take {
558 debug_assert_eq!(new_start, 0);
559 debug_assert_eq!(new_levels_start, 0);
560 new_end += first_row_start;
561 new_levels_end += first_row_start;
562 }
563
564 (new_start..new_end, new_levels_start..new_levels_end)
565 } else {
566 if range.start > 0 {
572 for (idx, rep) in rep.iter().skip(1).enumerate() {
573 if *rep == max_rep {
574 rows_seen += 1;
575 if rows_seen == range.start {
576 new_start = idx as u64 + 1;
577 break;
578 }
579 }
580 }
581 }
582 let mut new_end = rep.len() as u64;
583 if range.end < total_items {
585 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
586 if *rep == max_rep {
587 rows_seen += 1;
588 if rows_seen == range.end {
589 new_end = idx as u64 + new_start + 1;
590 break;
591 }
592 }
593 }
594 }
595
596 if preamble_action == PreambleAction::Skip {
598 new_start += first_row_start;
599 new_end += first_row_start;
600 } else if preamble_action == PreambleAction::Take {
601 debug_assert_eq!(new_start, 0);
602 new_end += first_row_start;
603 }
604
605 (new_start..new_end, new_start..new_end)
606 }
607 } else {
608 (range.clone(), range)
611 }
612 }
613
614 fn decode_miniblock_chunk(
616 &self,
617 buf: &LanceBuffer,
618 items_in_chunk: u64,
619 ) -> Result<DecodedMiniBlockChunk> {
620 let bytes_rep = u16::from_le_bytes([buf[0], buf[1]]) as usize;
622 let bytes_def = u16::from_le_bytes([buf[2], buf[3]]) as usize;
623 let bytes_val = u16::from_le_bytes([buf[4], buf[5]]) as usize;
624
625 debug_assert!(buf.len() >= bytes_rep + bytes_def + bytes_val + 6);
626 debug_assert!(
627 buf.len()
628 <= bytes_rep
629 + bytes_def
630 + bytes_val
631 + 6
632 + 1 + (2 * MINIBLOCK_MAX_PADDING) );
635 let p1 = bytes_rep % 2;
636 let rep = buf.slice_with_length(6, bytes_rep);
637 let def = buf.slice_with_length(6 + bytes_rep + p1, bytes_def);
638 let p2 = pad_bytes::<MINIBLOCK_ALIGNMENT>(6 + bytes_rep + p1 + bytes_def);
639 let values = buf.slice_with_length(6 + bytes_rep + bytes_def + p2, bytes_val);
640
641 let values = self.value_decompressor.decompress(values, items_in_chunk)?;
642
643 let rep = Self::decode_levels(self.rep_decompressor.as_ref(), rep)?;
644 let def = Self::decode_levels(self.def_decompressor.as_ref(), def)?;
645
646 Ok(DecodedMiniBlockChunk { rep, def, values })
647 }
648}
649
650impl DecodePageTask for DecodeMiniBlockTask {
651 fn decode(self: Box<Self>) -> Result<DecodedPage> {
652 let mut repbuf: Option<LevelBuffer> = None;
654 let mut defbuf: Option<LevelBuffer> = None;
655
656 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
657
658 let estimated_size_bytes = self
660 .instructions
661 .iter()
662 .map(|(_, chunk)| chunk.data.len())
663 .sum::<usize>()
664 * 2;
665 let mut data_builder =
666 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
667
668 let mut level_offset = 0;
670 for (instructions, chunk) in self.instructions.iter() {
672 let DecodedMiniBlockChunk { rep, def, values } =
676 self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
677
678 let row_range_start =
680 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
681 let row_range_end = row_range_start + instructions.rows_to_take;
682
683 let (item_range, level_range) = Self::map_range(
685 row_range_start..row_range_end,
686 rep.as_ref(),
687 def.as_ref(),
688 max_rep,
689 self.max_visible_level,
690 chunk.items_in_chunk,
691 instructions.preamble_action,
692 );
693
694 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
696 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
697 level_offset += (level_range.end - level_range.start) as usize;
698 data_builder.append(&values, item_range);
699 }
700
701 let data = data_builder.finish();
702
703 let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
704
705 if let Some(dictionary) = &self.dictionary_data {
707 let estimated_size_bytes = dictionary.data_size()
709 * (data.num_values() + dictionary.num_values() - 1)
710 / dictionary.num_values();
711 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
712
713 if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
715 let indices = fixed_width_data_block.data.borrow_to_typed_slice::<u8>();
716 let indices = indices.as_ref();
717
718 indices.iter().for_each(|&idx| {
719 data_builder.append(dictionary, idx as u64..idx as u64 + 1);
720 });
721
722 let data = data_builder.finish();
723 return Ok(DecodedPage {
724 data,
725 repdef: unraveler,
726 });
727 }
728 }
729
730 Ok(DecodedPage {
731 data,
732 repdef: unraveler,
733 })
734 }
735}
736
737#[derive(Debug)]
740struct LoadedChunk {
741 data: LanceBuffer,
742 items_in_chunk: u64,
743 byte_range: Range<u64>,
744 chunk_idx: usize,
745}
746
747impl Clone for LoadedChunk {
748 fn clone(&self) -> Self {
749 Self {
750 data: self.data.try_clone().unwrap(),
752 items_in_chunk: self.items_in_chunk,
753 byte_range: self.byte_range.clone(),
754 chunk_idx: self.chunk_idx,
755 }
756 }
757}
758
759#[derive(Debug)]
762struct MiniBlockDecoder {
763 rep_decompressor: Arc<dyn BlockDecompressor>,
764 def_decompressor: Arc<dyn BlockDecompressor>,
765 value_decompressor: Arc<dyn MiniBlockDecompressor>,
766 def_meaning: Arc<[DefinitionInterpretation]>,
767 loaded_chunks: VecDeque<LoadedChunk>,
768 instructions: VecDeque<ChunkInstructions>,
769 offset_in_current_chunk: u64,
770 num_rows: u64,
771 items_per_row: u64,
772 dictionary: Option<Arc<DataBlock>>,
773}
774
775impl StructuralPageDecoder for MiniBlockDecoder {
778 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
779 let mut items_desired = num_rows * self.items_per_row;
780 let mut need_preamble = false;
781 let mut skip_in_chunk = self.offset_in_current_chunk;
782 let mut drain_instructions = Vec::new();
783 while items_desired > 0 || need_preamble {
784 let (instructions, consumed) = self
785 .instructions
786 .front()
787 .unwrap()
788 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
789
790 while self.loaded_chunks.front().unwrap().chunk_idx
791 != instructions.chunk_instructions.chunk_idx
792 {
793 self.loaded_chunks.pop_front();
794 }
795 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
796 if consumed {
797 self.instructions.pop_front();
798 }
799 }
800 self.offset_in_current_chunk = skip_in_chunk;
803
804 let max_visible_level = self
805 .def_meaning
806 .iter()
807 .take_while(|l| !l.is_list())
808 .map(|l| l.num_def_levels())
809 .sum::<u16>();
810
811 Ok(Box::new(DecodeMiniBlockTask {
812 instructions: drain_instructions,
813 def_decompressor: self.def_decompressor.clone(),
814 rep_decompressor: self.rep_decompressor.clone(),
815 value_decompressor: self.value_decompressor.clone(),
816 dictionary_data: self.dictionary.clone(),
817 def_meaning: self.def_meaning.clone(),
818 max_visible_level,
819 }))
820 }
821
822 fn num_rows(&self) -> u64 {
823 self.num_rows
824 }
825}
826
827#[derive(Debug)]
828struct CachedComplexAllNullState {
829 rep: Option<ScalarBuffer<u16>>,
830 def: Option<ScalarBuffer<u16>>,
831}
832
833impl DeepSizeOf for CachedComplexAllNullState {
834 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
835 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
836 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
837 }
838}
839
840impl CachedPageData for CachedComplexAllNullState {
841 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
842 self
843 }
844}
845
846#[derive(Debug)]
855pub struct ComplexAllNullScheduler {
856 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
858 def_meaning: Arc<[DefinitionInterpretation]>,
859 items_per_row: u64,
860 repdef: Option<Arc<CachedComplexAllNullState>>,
861}
862
863impl ComplexAllNullScheduler {
864 pub fn new(
865 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
866 def_meaning: Arc<[DefinitionInterpretation]>,
867 items_per_row: u64,
868 ) -> Self {
869 Self {
870 buffer_offsets_and_sizes,
871 def_meaning,
872 items_per_row,
873 repdef: None,
874 }
875 }
876}
877
878impl StructuralPageScheduler for ComplexAllNullScheduler {
879 fn initialize<'a>(
880 &'a mut self,
881 io: &Arc<dyn EncodingsIo>,
882 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
883 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
885 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
886 let has_rep = rep_size > 0;
887 let has_def = def_size > 0;
888
889 let mut reads = Vec::with_capacity(2);
890 if has_rep {
891 reads.push(rep_pos..rep_pos + rep_size);
892 }
893 if has_def {
894 reads.push(def_pos..def_pos + def_size);
895 }
896
897 let data = io.submit_request(reads, 0);
898
899 async move {
900 let data = data.await?;
901 let mut data_iter = data.into_iter();
902
903 let rep = if has_rep {
904 let rep = data_iter.next().unwrap();
905 let mut rep = LanceBuffer::from_bytes(rep, 2);
906 let rep = rep.borrow_to_typed_slice::<u16>();
907 Some(rep)
908 } else {
909 None
910 };
911
912 let def = if has_def {
913 let def = data_iter.next().unwrap();
914 let mut def = LanceBuffer::from_bytes(def, 2);
915 let def = def.borrow_to_typed_slice::<u16>();
916 Some(def)
917 } else {
918 None
919 };
920
921 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
922
923 self.repdef = Some(repdef.clone());
924
925 Ok(repdef as Arc<dyn CachedPageData>)
926 }
927 .boxed()
928 }
929
930 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
931 self.repdef = Some(
932 data.clone()
933 .as_arc_any()
934 .downcast::<CachedComplexAllNullState>()
935 .unwrap(),
936 );
937 }
938
939 fn schedule_ranges(
940 &self,
941 ranges: &[Range<u64>],
942 _io: &Arc<dyn EncodingsIo>,
943 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
944 let ranges = VecDeque::from_iter(ranges.iter().cloned());
945 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
946 let item_ranges = ranges
947 .iter()
948 .map(|r| r.start * self.items_per_row..r.end * self.items_per_row)
949 .collect();
950 Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
951 ranges: item_ranges,
952 rep: self.repdef.as_ref().unwrap().rep.clone(),
953 def: self.repdef.as_ref().unwrap().def.clone(),
954 items_per_row: self.items_per_row,
955 num_rows,
956 def_meaning: self.def_meaning.clone(),
957 }) as Box<dyn StructuralPageDecoder>))
958 .boxed())
959 }
960}
961
962#[derive(Debug)]
963pub struct ComplexAllNullPageDecoder {
964 ranges: VecDeque<Range<u64>>,
965 rep: Option<ScalarBuffer<u16>>,
966 def: Option<ScalarBuffer<u16>>,
967 num_rows: u64,
968 items_per_row: u64,
969 def_meaning: Arc<[DefinitionInterpretation]>,
970}
971
972impl ComplexAllNullPageDecoder {
973 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
974 let mut rows_desired = num_rows;
975 let mut ranges = Vec::with_capacity(self.ranges.len());
976 while rows_desired > 0 {
977 let front = self.ranges.front_mut().unwrap();
978 let avail = front.end - front.start;
979 if avail > rows_desired {
980 ranges.push(front.start..front.start + rows_desired);
981 front.start += rows_desired;
982 rows_desired = 0;
983 } else {
984 ranges.push(self.ranges.pop_front().unwrap());
985 rows_desired -= avail;
986 }
987 }
988 ranges
989 }
990}
991
992impl StructuralPageDecoder for ComplexAllNullPageDecoder {
993 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
994 let num_items = num_rows * self.items_per_row;
999 let drained_ranges = self.drain_ranges(num_items);
1000 Ok(Box::new(DecodeComplexAllNullTask {
1001 ranges: drained_ranges,
1002 rep: self.rep.clone(),
1003 def: self.def.clone(),
1004 def_meaning: self.def_meaning.clone(),
1005 }))
1006 }
1007
1008 fn num_rows(&self) -> u64 {
1009 self.num_rows
1010 }
1011}
1012
1013#[derive(Debug)]
1016pub struct DecodeComplexAllNullTask {
1017 ranges: Vec<Range<u64>>,
1018 rep: Option<ScalarBuffer<u16>>,
1019 def: Option<ScalarBuffer<u16>>,
1020 def_meaning: Arc<[DefinitionInterpretation]>,
1021}
1022
1023impl DecodeComplexAllNullTask {
1024 fn decode_level(
1025 &self,
1026 levels: &Option<ScalarBuffer<u16>>,
1027 num_values: u64,
1028 ) -> Option<Vec<u16>> {
1029 levels.as_ref().map(|levels| {
1030 let mut referenced_levels = Vec::with_capacity(num_values as usize);
1031 for range in &self.ranges {
1032 referenced_levels.extend(
1033 levels[range.start as usize..range.end as usize]
1034 .iter()
1035 .copied(),
1036 );
1037 }
1038 referenced_levels
1039 })
1040 }
1041}
1042
1043impl DecodePageTask for DecodeComplexAllNullTask {
1044 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1045 let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1046 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1047 let rep = self.decode_level(&self.rep, num_values);
1048 let def = self.decode_level(&self.def, num_values);
1049 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
1050 Ok(DecodedPage {
1051 data,
1052 repdef: unraveler,
1053 })
1054 }
1055}
1056
1057#[derive(Debug, Default)]
1062pub struct SimpleAllNullScheduler {}
1063
1064impl StructuralPageScheduler for SimpleAllNullScheduler {
1065 fn initialize<'a>(
1066 &'a mut self,
1067 _io: &Arc<dyn EncodingsIo>,
1068 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1069 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1070 }
1071
1072 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1073
1074 fn schedule_ranges(
1075 &self,
1076 ranges: &[Range<u64>],
1077 _io: &Arc<dyn EncodingsIo>,
1078 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1079 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1080 Ok(std::future::ready(Ok(
1081 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
1082 ))
1083 .boxed())
1084 }
1085}
1086
1087#[derive(Debug)]
1090struct SimpleAllNullDecodePageTask {
1091 num_values: u64,
1092}
1093impl DecodePageTask for SimpleAllNullDecodePageTask {
1094 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1095 let unraveler = RepDefUnraveler::new(
1096 None,
1097 Some(vec![1; self.num_values as usize]),
1098 Arc::new([DefinitionInterpretation::NullableItem]),
1099 );
1100 Ok(DecodedPage {
1101 data: DataBlock::AllNull(AllNullDataBlock {
1102 num_values: self.num_values,
1103 }),
1104 repdef: unraveler,
1105 })
1106 }
1107}
1108
1109#[derive(Debug)]
1110pub struct SimpleAllNullPageDecoder {
1111 num_rows: u64,
1112}
1113
1114impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1115 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1116 Ok(Box::new(SimpleAllNullDecodePageTask {
1117 num_values: num_rows,
1118 }))
1119 }
1120
1121 fn num_rows(&self) -> u64 {
1122 self.num_rows
1123 }
1124}
1125
1126#[derive(Debug, Clone)]
1127struct MiniBlockSchedulerDictionary {
1128 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1130 dictionary_buf_position_and_size: (u64, u64),
1131 dictionary_data_alignment: u64,
1132}
1133
1134#[derive(Debug)]
1135struct RepIndexBlock {
1136 first_row: u64,
1140 starts_including_trailer: u64,
1143 has_preamble: bool,
1145 has_trailer: bool,
1147}
1148
1149impl DeepSizeOf for RepIndexBlock {
1150 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1151 0
1152 }
1153}
1154
1155#[derive(Debug)]
1156struct RepetitionIndex {
1157 blocks: Vec<RepIndexBlock>,
1158}
1159
1160impl DeepSizeOf for RepetitionIndex {
1161 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1162 self.blocks.deep_size_of_children(context)
1163 }
1164}
1165
1166impl RepetitionIndex {
1167 fn decode(rep_index: &[Vec<u64>]) -> Self {
1168 let mut chunk_has_preamble = false;
1169 let mut offset = 0;
1170 let mut blocks = Vec::with_capacity(rep_index.len());
1171 for chunk_rep in rep_index {
1172 let ends_count = chunk_rep[0];
1173 let partial_count = chunk_rep[1];
1174
1175 let chunk_has_trailer = partial_count > 0;
1176 let mut starts_including_trailer = ends_count;
1177 if chunk_has_trailer {
1178 starts_including_trailer += 1;
1179 }
1180 if chunk_has_preamble {
1181 starts_including_trailer -= 1;
1182 }
1183
1184 blocks.push(RepIndexBlock {
1185 first_row: offset,
1186 starts_including_trailer,
1187 has_preamble: chunk_has_preamble,
1188 has_trailer: chunk_has_trailer,
1189 });
1190
1191 chunk_has_preamble = chunk_has_trailer;
1192 offset += starts_including_trailer;
1193 }
1194
1195 Self { blocks }
1196 }
1197}
1198
1199#[derive(Debug)]
1201struct MiniBlockCacheableState {
1202 chunk_meta: Vec<ChunkMeta>,
1204 rep_index: RepetitionIndex,
1206 dictionary: Option<Arc<DataBlock>>,
1208}
1209
1210impl DeepSizeOf for MiniBlockCacheableState {
1211 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1212 self.rep_index.deep_size_of_children(context)
1213 + self
1214 .dictionary
1215 .as_ref()
1216 .map(|dict| dict.data_size() as usize)
1217 .unwrap_or(0)
1218 }
1219}
1220
1221impl CachedPageData for MiniBlockCacheableState {
1222 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1223 self
1224 }
1225}
1226
1227#[derive(Debug)]
1254pub struct MiniBlockScheduler {
1255 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1257 priority: u64,
1258 items_in_page: u64,
1259 items_per_row: u64,
1260 repetition_index_depth: u16,
1261 rep_decompressor: Arc<dyn BlockDecompressor>,
1262 def_decompressor: Arc<dyn BlockDecompressor>,
1263 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1264 def_meaning: Arc<[DefinitionInterpretation]>,
1265 dictionary: Option<MiniBlockSchedulerDictionary>,
1266 page_meta: Option<Arc<MiniBlockCacheableState>>,
1268}
1269
1270impl MiniBlockScheduler {
1271 fn try_new(
1272 buffer_offsets_and_sizes: &[(u64, u64)],
1273 priority: u64,
1274 items_in_page: u64,
1275 items_per_row: u64,
1276 layout: &pb::MiniBlockLayout,
1277 decompressors: &dyn DecompressorStrategy,
1278 ) -> Result<Self> {
1279 let rep_decompressor =
1280 decompressors.create_block_decompressor(layout.rep_compression.as_ref().unwrap())?;
1281 let def_decompressor =
1282 decompressors.create_block_decompressor(layout.def_compression.as_ref().unwrap())?;
1283 let def_meaning = layout
1284 .layers
1285 .iter()
1286 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1287 .collect::<Vec<_>>();
1288 let value_decompressor = decompressors
1289 .create_miniblock_decompressor(layout.value_compression.as_ref().unwrap())?;
1290 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1291 match dictionary_encoding.array_encoding.as_ref().unwrap() {
1292 pb::array_encoding::ArrayEncoding::Variable(_) => {
1293 Some(MiniBlockSchedulerDictionary {
1294 dictionary_decompressor: decompressors
1295 .create_block_decompressor(dictionary_encoding)?
1296 .into(),
1297 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1298 dictionary_data_alignment: 4,
1299 })
1300 }
1301 pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
1302 dictionary_decompressor: decompressors
1303 .create_block_decompressor(dictionary_encoding)?
1304 .into(),
1305 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1306 dictionary_data_alignment: 16,
1307 }),
1308 _ => {
1309 unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1310 }
1311 }
1312 } else {
1313 None
1314 };
1315
1316 Ok(Self {
1317 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1318 rep_decompressor: rep_decompressor.into(),
1319 def_decompressor: def_decompressor.into(),
1320 value_decompressor: value_decompressor.into(),
1321 repetition_index_depth: layout.repetition_index_depth as u16,
1322 priority,
1323 items_in_page,
1324 items_per_row,
1325 dictionary,
1326 def_meaning: def_meaning.into(),
1327 page_meta: None,
1328 })
1329 }
1330
1331 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1332 let page_meta = self.page_meta.as_ref().unwrap();
1333 chunk_indices
1334 .iter()
1335 .map(|&chunk_idx| {
1336 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1337 let bytes_start = chunk_meta.offset_bytes;
1338 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1339 LoadedChunk {
1340 byte_range: bytes_start..bytes_end,
1341 items_in_chunk: chunk_meta.num_values,
1342 chunk_idx,
1343 data: LanceBuffer::empty(),
1344 }
1345 })
1346 .collect()
1347 }
1348}
1349
1350#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1351enum PreambleAction {
1352 Take,
1353 Skip,
1354 Absent,
1355}
1356
1357#[derive(Clone, Debug, PartialEq, Eq)]
1363struct ChunkInstructions {
1364 chunk_idx: usize,
1366 preamble: PreambleAction,
1372 rows_to_skip: u64,
1376 rows_to_take: u64,
1378 take_trailer: bool,
1386}
1387
1388#[derive(Debug, PartialEq, Eq)]
1406struct ChunkDrainInstructions {
1407 chunk_instructions: ChunkInstructions,
1408 rows_to_skip: u64,
1409 rows_to_take: u64,
1410 preamble_action: PreambleAction,
1411}
1412
1413impl ChunkInstructions {
1414 fn schedule_instructions(rep_index: &RepetitionIndex, user_ranges: &[Range<u64>]) -> Vec<Self> {
1420 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1424
1425 for user_range in user_ranges {
1426 let mut rows_needed = user_range.end - user_range.start;
1427 let mut need_preamble = false;
1428
1429 let mut block_index = match rep_index
1432 .blocks
1433 .binary_search_by_key(&user_range.start, |block| block.first_row)
1434 {
1435 Ok(idx) => {
1436 let mut idx = idx;
1439 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1440 idx -= 1;
1441 }
1442 idx
1443 }
1444 Err(idx) => idx - 1,
1446 };
1447
1448 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1449
1450 while rows_needed > 0 || need_preamble {
1451 let chunk = &rep_index.blocks[block_index];
1452 let rows_avail = chunk.starts_including_trailer - to_skip;
1453 debug_assert!(rows_avail > 0);
1454
1455 let rows_to_take = rows_avail.min(rows_needed);
1456 rows_needed -= rows_to_take;
1457
1458 let mut take_trailer = false;
1459 let preamble = if chunk.has_preamble {
1460 if need_preamble {
1461 PreambleAction::Take
1462 } else {
1463 PreambleAction::Skip
1464 }
1465 } else {
1466 PreambleAction::Absent
1467 };
1468 let mut rows_to_take_no_trailer = rows_to_take;
1469
1470 if rows_to_take == rows_avail && chunk.has_trailer {
1472 take_trailer = true;
1473 need_preamble = true;
1474 rows_to_take_no_trailer -= 1;
1475 } else {
1476 need_preamble = false;
1477 };
1478
1479 chunk_instructions.push(Self {
1480 preamble,
1481 chunk_idx: block_index,
1482 rows_to_skip: to_skip,
1483 rows_to_take: rows_to_take_no_trailer,
1484 take_trailer,
1485 });
1486
1487 to_skip = 0;
1488 block_index += 1;
1489 }
1490 }
1491
1492 if user_ranges.len() > 1 {
1496 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1498 let mut instructions_iter = chunk_instructions.into_iter();
1499 merged_instructions.push(instructions_iter.next().unwrap());
1500 for instruction in instructions_iter {
1501 let last = merged_instructions.last_mut().unwrap();
1502 if last.chunk_idx == instruction.chunk_idx
1503 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1504 {
1505 last.rows_to_take += instruction.rows_to_take;
1506 last.take_trailer |= instruction.take_trailer;
1507 } else {
1508 merged_instructions.push(instruction);
1509 }
1510 }
1511 merged_instructions
1512 } else {
1513 chunk_instructions
1514 }
1515 }
1516
1517 fn drain_from_instruction(
1518 &self,
1519 rows_desired: &mut u64,
1520 need_preamble: &mut bool,
1521 skip_in_chunk: &mut u64,
1522 ) -> (ChunkDrainInstructions, bool) {
1523 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1525 let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1526 let has_preamble = self.preamble != PreambleAction::Absent;
1527 let preamble_action = match (*need_preamble, has_preamble) {
1528 (true, true) => PreambleAction::Take,
1529 (true, false) => panic!("Need preamble but there isn't one"),
1530 (false, true) => PreambleAction::Skip,
1531 (false, false) => PreambleAction::Absent,
1532 };
1533
1534 if self.take_trailer {
1536 rows_avail += 1;
1537 }
1538
1539 let rows_taking = if *rows_desired >= rows_avail {
1542 *need_preamble = self.take_trailer;
1545 rows_avail
1546 } else {
1547 *need_preamble = false;
1550 *rows_desired
1551 };
1552 let rows_skipped = *skip_in_chunk;
1553
1554 let consumed_chunk = if *rows_desired >= rows_avail {
1556 *rows_desired -= rows_avail;
1557 *skip_in_chunk = 0;
1558 true
1559 } else {
1560 *skip_in_chunk += *rows_desired;
1561 *rows_desired = 0;
1562 false
1563 };
1564
1565 (
1566 ChunkDrainInstructions {
1567 chunk_instructions: self.clone(),
1568 rows_to_skip: rows_skipped,
1569 rows_to_take: rows_taking,
1570 preamble_action,
1571 },
1572 consumed_chunk,
1573 )
1574 }
1575}
1576
1577impl StructuralPageScheduler for MiniBlockScheduler {
1578 fn initialize<'a>(
1579 &'a mut self,
1580 io: &Arc<dyn EncodingsIo>,
1581 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1582 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1586 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1587 let mut bufs_needed = 1;
1588 if self.dictionary.is_some() {
1589 bufs_needed += 1;
1590 }
1591 if self.repetition_index_depth > 0 {
1592 bufs_needed += 1;
1593 }
1594 let mut required_ranges = Vec::with_capacity(bufs_needed);
1595 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1596 if let Some(ref dictionary) = self.dictionary {
1597 required_ranges.push(
1598 dictionary.dictionary_buf_position_and_size.0
1599 ..dictionary.dictionary_buf_position_and_size.0
1600 + dictionary.dictionary_buf_position_and_size.1,
1601 );
1602 }
1603 if self.repetition_index_depth > 0 {
1604 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1605 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1606 }
1607 let io_req = io.submit_request(required_ranges, 0);
1608
1609 async move {
1610 let mut buffers = io_req.await?.into_iter().fuse();
1611 let meta_bytes = buffers.next().unwrap();
1612 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1613 let rep_index_bytes = buffers.next();
1614
1615 assert!(meta_bytes.len() % 2 == 0);
1617 let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1618 let words = bytes.borrow_to_typed_slice::<u16>();
1619 let words = words.as_ref();
1620
1621 let mut chunk_meta = Vec::with_capacity(words.len());
1622
1623 let mut rows_counter = 0;
1624 let mut offset_bytes = value_buf_position;
1625 for (word_idx, word) in words.iter().enumerate() {
1626 let log_num_values = word & 0x0F;
1627 let divided_bytes = word >> 4;
1628 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1629 debug_assert!(num_bytes > 0);
1630 let num_values = if word_idx < words.len() - 1 {
1631 debug_assert!(log_num_values > 0);
1632 1 << log_num_values
1633 } else {
1634 debug_assert_eq!(log_num_values, 0);
1635 self.items_in_page - rows_counter
1636 };
1637 rows_counter += num_values;
1638
1639 chunk_meta.push(ChunkMeta {
1640 num_values,
1641 chunk_size_bytes: num_bytes as u64,
1642 offset_bytes,
1643 });
1644 offset_bytes += num_bytes as u64;
1645 }
1646
1647 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1649 assert!(rep_index_data.len() % 8 == 0);
1652 let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
1653 let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
1654 repetition_index_vals
1656 .as_ref()
1657 .chunks_exact(self.repetition_index_depth as usize + 1)
1658 .map(|c| c.to_vec())
1659 .collect::<Vec<_>>()
1660 } else {
1661 chunk_meta
1664 .iter()
1665 .map(|c| vec![c.num_values, 0])
1666 .collect::<Vec<_>>()
1667 };
1668
1669 let mut page_meta = MiniBlockCacheableState {
1670 chunk_meta,
1671 rep_index: RepetitionIndex::decode(&rep_index),
1672 dictionary: None,
1673 };
1674
1675 if let Some(ref mut dictionary) = self.dictionary {
1677 let dictionary_data = dictionary_bytes.unwrap();
1678 page_meta.dictionary =
1679 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1680 LanceBuffer::from_bytes(
1681 dictionary_data,
1682 dictionary.dictionary_data_alignment,
1683 ),
1684 )?));
1685 };
1686 let page_meta = Arc::new(page_meta);
1687 self.page_meta = Some(page_meta.clone());
1688 Ok(page_meta as Arc<dyn CachedPageData>)
1689 }
1690 .boxed()
1691 }
1692
1693 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1694 self.page_meta = Some(
1695 data.clone()
1696 .as_arc_any()
1697 .downcast::<MiniBlockCacheableState>()
1698 .unwrap(),
1699 );
1700 }
1701
1702 fn schedule_ranges(
1703 &self,
1704 ranges: &[Range<u64>],
1705 io: &Arc<dyn EncodingsIo>,
1706 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1707 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1708 let ranges = ranges
1709 .iter()
1710 .map(|r| r.start * self.items_per_row..r.end * self.items_per_row)
1711 .collect::<Vec<_>>();
1712
1713 let page_meta = self.page_meta.as_ref().unwrap();
1714
1715 let chunk_instructions =
1716 ChunkInstructions::schedule_instructions(&page_meta.rep_index, &ranges);
1717
1718 debug_assert_eq!(
1719 num_rows * self.items_per_row,
1720 chunk_instructions
1721 .iter()
1722 .map(|ci| {
1723 let taken = ci.rows_to_take;
1724 if ci.take_trailer {
1725 taken + 1
1726 } else {
1727 taken
1728 }
1729 })
1730 .sum::<u64>()
1731 );
1732
1733 let chunks_needed = chunk_instructions
1734 .iter()
1735 .map(|ci| ci.chunk_idx)
1736 .unique()
1737 .collect::<Vec<_>>();
1738 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1739 let chunk_ranges = loaded_chunks
1740 .iter()
1741 .map(|c| c.byte_range.clone())
1742 .collect::<Vec<_>>();
1743 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1744
1745 let rep_decompressor = self.rep_decompressor.clone();
1746 let def_decompressor = self.def_decompressor.clone();
1747 let value_decompressor = self.value_decompressor.clone();
1748 let dictionary = page_meta
1749 .dictionary
1750 .as_ref()
1751 .map(|dictionary| dictionary.clone());
1752 let def_meaning = self.def_meaning.clone();
1753 let items_per_row = self.items_per_row;
1754
1755 let res = async move {
1756 let loaded_chunk_data = loaded_chunk_data.await?;
1757 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1758 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1759 }
1760
1761 Ok(Box::new(MiniBlockDecoder {
1762 rep_decompressor,
1763 def_decompressor,
1764 value_decompressor,
1765 def_meaning,
1766 loaded_chunks: VecDeque::from_iter(loaded_chunks),
1767 instructions: VecDeque::from(chunk_instructions),
1768 offset_in_current_chunk: 0,
1769 dictionary,
1770 num_rows,
1771 items_per_row,
1772 }) as Box<dyn StructuralPageDecoder>)
1773 }
1774 .boxed();
1775 Ok(res)
1776 }
1777}
1778
1779#[derive(Debug)]
1780struct FullZipRepIndexDetails {
1781 buf_position: u64,
1782 bytes_per_value: u64, }
1784
1785#[derive(Debug)]
1786enum PerValueDecompressor {
1787 Fixed(Arc<dyn FixedPerValueDecompressor>),
1788 Variable(Arc<dyn VariablePerValueDecompressor>),
1789}
1790
1791#[derive(Debug)]
1792struct FullZipDecodeDetails {
1793 value_decompressor: PerValueDecompressor,
1794 def_meaning: Arc<[DefinitionInterpretation]>,
1795 ctrl_word_parser: ControlWordParser,
1796 max_rep: u16,
1797 max_visible_def: u16,
1798 items_per_row: u64,
1799}
1800
1801#[derive(Debug)]
1809pub struct FullZipScheduler {
1810 data_buf_position: u64,
1811 rep_index: Option<FullZipRepIndexDetails>,
1812 priority: u64,
1813 rows_in_page: u64,
1814 bits_per_offset: u8,
1815 details: Arc<FullZipDecodeDetails>,
1816}
1817
1818impl FullZipScheduler {
1819 fn try_new(
1820 buffer_offsets_and_sizes: &[(u64, u64)],
1821 priority: u64,
1822 rows_in_page: u64,
1823 items_per_row: u64,
1824 layout: &pb::FullZipLayout,
1825 decompressors: &dyn DecompressorStrategy,
1826 bits_per_offset: u8,
1827 ) -> Result<Self> {
1828 let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1832 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1833 let num_reps = (items_per_row * rows_in_page) + 1;
1834 let bytes_per_rep = len / num_reps;
1835 debug_assert_eq!(len % num_reps, 0);
1836 debug_assert!(
1837 bytes_per_rep == 1
1838 || bytes_per_rep == 2
1839 || bytes_per_rep == 4
1840 || bytes_per_rep == 8
1841 );
1842 FullZipRepIndexDetails {
1843 buf_position: *pos,
1844 bytes_per_value: bytes_per_rep,
1845 }
1846 });
1847
1848 let value_decompressor = match layout.details {
1849 Some(pb::full_zip_layout::Details::BitsPerValue(_)) => {
1850 let decompressor = decompressors.create_fixed_per_value_decompressor(
1851 layout.value_compression.as_ref().unwrap(),
1852 )?;
1853 PerValueDecompressor::Fixed(decompressor.into())
1854 }
1855 Some(pb::full_zip_layout::Details::BitsPerOffset(_)) => {
1856 let decompressor = decompressors.create_variable_per_value_decompressor(
1857 layout.value_compression.as_ref().unwrap(),
1858 )?;
1859 PerValueDecompressor::Variable(decompressor.into())
1860 }
1861 None => {
1862 panic!("Full-zip layout must have a `details` field");
1863 }
1864 };
1865 let ctrl_word_parser = ControlWordParser::new(
1866 layout.bits_rep.try_into().unwrap(),
1867 layout.bits_def.try_into().unwrap(),
1868 );
1869 let def_meaning = layout
1870 .layers
1871 .iter()
1872 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1873 .collect::<Vec<_>>();
1874
1875 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1876 let max_visible_def = def_meaning
1877 .iter()
1878 .filter(|d| !d.is_list())
1879 .map(|d| d.num_def_levels())
1880 .sum();
1881
1882 let details = Arc::new(FullZipDecodeDetails {
1883 value_decompressor,
1884 def_meaning: def_meaning.into(),
1885 ctrl_word_parser,
1886 items_per_row,
1887 max_rep,
1888 max_visible_def,
1889 });
1890 Ok(Self {
1891 data_buf_position,
1892 rep_index,
1893 details,
1894 priority,
1895 rows_in_page,
1896 bits_per_offset,
1897 })
1898 }
1899
1900 #[allow(clippy::too_many_arguments)]
1906 async fn indirect_schedule_ranges(
1907 data_buffer_pos: u64,
1908 item_ranges: Vec<Range<u64>>,
1909 rep_index_ranges: Vec<Range<u64>>,
1910 bytes_per_rep: u64,
1911 io: Arc<dyn EncodingsIo>,
1912 priority: u64,
1913 bits_per_offset: u8,
1914 details: Arc<FullZipDecodeDetails>,
1915 ) -> Result<Box<dyn StructuralPageDecoder>> {
1916 let byte_ranges = io
1917 .submit_request(rep_index_ranges, priority)
1918 .await?
1919 .into_iter()
1920 .map(|d| LanceBuffer::from_bytes(d, 1))
1921 .collect::<Vec<_>>();
1922 let byte_ranges = LanceBuffer::concat(&byte_ranges);
1923 let byte_ranges = ByteUnpacker::new(byte_ranges, bytes_per_rep as usize)
1924 .chunks(2)
1925 .into_iter()
1926 .map(|mut c| {
1927 let start = c.next().unwrap() + data_buffer_pos;
1928 let end = c.next().unwrap() + data_buffer_pos;
1929 start..end
1930 })
1931 .collect::<Vec<_>>();
1932
1933 let data = io.submit_request(byte_ranges, priority);
1934
1935 let data = data.await?;
1936 let data = data
1937 .into_iter()
1938 .map(|d| LanceBuffer::from_bytes(d, 1))
1939 .collect();
1940 let num_rows = item_ranges.into_iter().map(|r| r.end - r.start).sum();
1941
1942 match &details.value_decompressor {
1943 PerValueDecompressor::Fixed(decompressor) => {
1944 let bits_per_value = decompressor.bits_per_value();
1945 assert!(bits_per_value > 0);
1946 if bits_per_value % 8 != 0 {
1947 unimplemented!("Bit-packed full-zip");
1950 }
1951 let bytes_per_value = bits_per_value / 8;
1952 let total_bytes_per_value =
1953 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1954 Ok(Box::new(FixedFullZipDecoder {
1955 details,
1956 data,
1957 num_rows,
1958 offset_in_current: 0,
1959 bytes_per_value: bytes_per_value as usize,
1960 total_bytes_per_value,
1961 }) as Box<dyn StructuralPageDecoder>)
1962 }
1963 PerValueDecompressor::Variable(_decompressor) => {
1964 Ok(Box::new(VariableFullZipDecoder::new(
1967 details,
1968 data,
1969 num_rows,
1970 bits_per_offset,
1971 bits_per_offset,
1972 )))
1973 }
1974 }
1975 }
1976
1977 fn schedule_ranges_rep(
1979 &self,
1980 ranges: &[Range<u64>],
1981 io: &Arc<dyn EncodingsIo>,
1982 rep_index: &FullZipRepIndexDetails,
1983 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1984 let item_ranges = ranges
1986 .iter()
1987 .map(|r| r.start * self.details.items_per_row..r.end * self.details.items_per_row)
1988 .collect::<Vec<_>>();
1989
1990 let rep_index_ranges = item_ranges
1991 .iter()
1992 .flat_map(|r| {
1993 let first_val_start =
1994 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
1995 let first_val_end = first_val_start + rep_index.bytes_per_value;
1996 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
1997 let last_val_end = last_val_start + rep_index.bytes_per_value;
1998 [first_val_start..first_val_end, last_val_start..last_val_end]
1999 })
2000 .collect::<Vec<_>>();
2001
2002 Ok(Self::indirect_schedule_ranges(
2005 self.data_buf_position,
2006 item_ranges,
2007 rep_index_ranges,
2008 rep_index.bytes_per_value,
2009 io.clone(),
2010 self.priority,
2011 self.bits_per_offset,
2012 self.details.clone(),
2013 )
2014 .boxed())
2015 }
2016
2017 fn schedule_ranges_simple(
2021 &self,
2022 ranges: &[Range<u64>],
2023 io: &dyn EncodingsIo,
2024 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2025 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2027 let item_ranges = ranges
2028 .iter()
2029 .map(|r| r.start * self.details.items_per_row..r.end * self.details.items_per_row)
2030 .collect::<Vec<_>>();
2031
2032 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2033 unreachable!()
2034 };
2035
2036 let bits_per_value = decompressor.bits_per_value();
2038 assert_eq!(bits_per_value % 8, 0);
2039 let bytes_per_value = bits_per_value / 8;
2040 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2041 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2042 let byte_ranges = item_ranges.iter().map(|r| {
2043 debug_assert!(r.end <= self.rows_in_page * self.details.items_per_row);
2044 let start = self.data_buf_position + r.start * total_bytes_per_value;
2045 let end = self.data_buf_position + r.end * total_bytes_per_value;
2046 start..end
2047 });
2048
2049 let data = io.submit_request(byte_ranges.collect(), self.priority);
2051
2052 let details = self.details.clone();
2053
2054 Ok(async move {
2055 let data = data.await?;
2056 let data = data
2057 .into_iter()
2058 .map(|d| LanceBuffer::from_bytes(d, 1))
2059 .collect();
2060 Ok(Box::new(FixedFullZipDecoder {
2061 details,
2062 data,
2063 num_rows,
2064 offset_in_current: 0,
2065 bytes_per_value: bytes_per_value as usize,
2066 total_bytes_per_value: total_bytes_per_value as usize,
2067 }) as Box<dyn StructuralPageDecoder>)
2068 }
2069 .boxed())
2070 }
2071}
2072
2073impl StructuralPageScheduler for FullZipScheduler {
2074 fn initialize<'a>(
2076 &'a mut self,
2077 _io: &Arc<dyn EncodingsIo>,
2078 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2079 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2080 }
2081
2082 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
2083
2084 fn schedule_ranges(
2085 &self,
2086 ranges: &[Range<u64>],
2087 io: &Arc<dyn EncodingsIo>,
2088 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2089 if let Some(rep_index) = self.rep_index.as_ref() {
2090 self.schedule_ranges_rep(ranges, io, rep_index)
2091 } else {
2092 self.schedule_ranges_simple(ranges, io.as_ref())
2093 }
2094 }
2095}
2096
2097#[derive(Debug)]
2105struct FixedFullZipDecoder {
2106 details: Arc<FullZipDecodeDetails>,
2107 data: VecDeque<LanceBuffer>,
2108 offset_in_current: usize,
2109 bytes_per_value: usize,
2110 total_bytes_per_value: usize,
2111 num_rows: u64,
2112}
2113
2114impl FixedFullZipDecoder {
2115 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2116 debug_assert!(num_rows > 0);
2117 let cur_buf = self.data.front_mut().unwrap();
2118 let start = self.offset_in_current;
2119 if self.details.ctrl_word_parser.has_rep() {
2120 let mut rows_started = 0;
2123 let mut num_items = 0;
2126 while self.offset_in_current < cur_buf.len() {
2127 let control = self.details.ctrl_word_parser.parse_desc(
2128 &cur_buf[self.offset_in_current..],
2129 self.details.max_rep,
2130 self.details.max_visible_def,
2131 );
2132 if control.is_new_row {
2133 if rows_started == num_rows {
2134 break;
2135 }
2136 rows_started += 1;
2137 }
2138 num_items += 1;
2139 if control.is_visible {
2140 self.offset_in_current += self.total_bytes_per_value;
2141 } else {
2142 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2143 }
2144 }
2145
2146 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2147 if self.offset_in_current == cur_buf.len() {
2148 self.data.pop_front();
2149 self.offset_in_current = 0;
2150 }
2151
2152 FullZipDecodeTaskItem {
2153 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2154 data: task_slice,
2155 bits_per_value: self.bytes_per_value as u64 * 8,
2156 num_values: num_items,
2157 block_info: BlockInfo::new(),
2158 }),
2159 rows_in_buf: rows_started,
2160 items_in_buf: num_items,
2161 }
2162 } else {
2163 let cur_buf = self.data.front_mut().unwrap();
2166 let bytes_avail = cur_buf.len() - self.offset_in_current;
2167 let offset_in_cur = self.offset_in_current;
2168
2169 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2170 let mut rows_taken = num_rows;
2171 let task_slice = if bytes_needed >= bytes_avail {
2172 self.offset_in_current = 0;
2173 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2174 self.data
2175 .pop_front()
2176 .unwrap()
2177 .slice_with_length(offset_in_cur, bytes_avail)
2178 } else {
2179 self.offset_in_current += bytes_needed;
2180 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2181 };
2182 FullZipDecodeTaskItem {
2183 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2184 data: task_slice,
2185 bits_per_value: self.bytes_per_value as u64 * 8,
2186 num_values: rows_taken,
2187 block_info: BlockInfo::new(),
2188 }),
2189 rows_in_buf: rows_taken,
2190 items_in_buf: rows_taken,
2191 }
2192 }
2193 }
2194}
2195
2196impl StructuralPageDecoder for FixedFullZipDecoder {
2197 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2198 let mut task_data = Vec::with_capacity(self.data.len());
2199 let mut remaining = num_rows * self.details.items_per_row;
2200 while remaining > 0 {
2201 let task_item = self.slice_next_task(remaining);
2202 remaining -= task_item.rows_in_buf;
2203 task_data.push(task_item);
2204 }
2205 let num_items = task_data.iter().map(|td| td.items_in_buf).sum::<u64>() as usize;
2206 Ok(Box::new(FixedFullZipDecodeTask {
2207 details: self.details.clone(),
2208 data: task_data,
2209 bytes_per_value: self.bytes_per_value,
2210 num_items,
2211 }))
2212 }
2213
2214 fn num_rows(&self) -> u64 {
2215 self.num_rows
2216 }
2217}
2218
2219#[derive(Debug)]
2224struct VariableFullZipDecoder {
2225 details: Arc<FullZipDecodeDetails>,
2226 decompressor: Arc<dyn VariablePerValueDecompressor>,
2227 data: LanceBuffer,
2228 offsets: LanceBuffer,
2229 rep: ScalarBuffer<u16>,
2230 def: ScalarBuffer<u16>,
2231 repdef_starts: Vec<usize>,
2232 data_starts: Vec<usize>,
2233 offset_starts: Vec<usize>,
2234 visible_item_counts: Vec<u64>,
2235 bits_per_offset: u8,
2236 current_idx: usize,
2237 num_rows: u64,
2238}
2239
2240impl VariableFullZipDecoder {
2241 fn new(
2242 details: Arc<FullZipDecodeDetails>,
2243 data: VecDeque<LanceBuffer>,
2244 num_rows: u64,
2245 in_bits_per_length: u8,
2246 out_bits_per_offset: u8,
2247 ) -> Self {
2248 let decompressor = match details.value_decompressor {
2249 PerValueDecompressor::Variable(ref d) => d.clone(),
2250 _ => unreachable!(),
2251 };
2252
2253 assert_eq!(in_bits_per_length % 8, 0);
2254 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2255
2256 let mut decoder = Self {
2257 details,
2258 decompressor,
2259 data: LanceBuffer::empty(),
2260 offsets: LanceBuffer::empty(),
2261 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2262 def: LanceBuffer::empty().borrow_to_typed_slice(),
2263 bits_per_offset: out_bits_per_offset,
2264 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2265 data_starts: Vec::with_capacity(num_rows as usize + 1),
2266 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2267 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2268 current_idx: 0,
2269 num_rows,
2270 };
2271
2272 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2293
2294 decoder
2295 }
2296
2297 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2298 match bits_per_offset {
2299 8 => *data.get_unchecked(0) as u64,
2300 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2301 32 => u32::from_le_bytes([
2302 *data.get_unchecked(0),
2303 *data.get_unchecked(1),
2304 *data.get_unchecked(2),
2305 *data.get_unchecked(3),
2306 ]) as u64,
2307 64 => u64::from_le_bytes([
2308 *data.get_unchecked(0),
2309 *data.get_unchecked(1),
2310 *data.get_unchecked(2),
2311 *data.get_unchecked(3),
2312 *data.get_unchecked(4),
2313 *data.get_unchecked(5),
2314 *data.get_unchecked(6),
2315 *data.get_unchecked(7),
2316 ]),
2317 _ => unreachable!(),
2318 }
2319 }
2320
2321 fn unzip(
2322 &mut self,
2323 data: VecDeque<LanceBuffer>,
2324 in_bits_per_length: u8,
2325 out_bits_per_offset: u8,
2326 num_rows: u64,
2327 ) {
2328 let mut rep = Vec::with_capacity(num_rows as usize);
2330 let mut def = Vec::with_capacity(num_rows as usize);
2331 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2332
2333 let bytes_per_offset = out_bits_per_offset as usize / 8;
2336 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2337 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2338
2339 let bytes_per_length = in_bits_per_length as usize / 8;
2340 let bytes_lengths = bytes_per_length * num_rows as usize;
2341
2342 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2343 let mut unzipped_data =
2346 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2347
2348 let mut current_offset = 0_u64;
2349 let mut visible_item_count = 0_u64;
2350 for databuf in data.into_iter() {
2351 let mut databuf = databuf.as_ref();
2352 while !databuf.is_empty() {
2353 let data_start = unzipped_data.len();
2354 let offset_start = offsets_data.len();
2355 let repdef_start = rep.len().max(def.len());
2358 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2360 databuf,
2361 self.details.max_rep,
2362 self.details.max_visible_def,
2363 );
2364 self.details
2365 .ctrl_word_parser
2366 .parse(databuf, &mut rep, &mut def);
2367 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2368
2369 if ctrl_desc.is_new_row {
2370 self.repdef_starts.push(repdef_start);
2371 self.data_starts.push(data_start);
2372 self.offset_starts.push(offset_start);
2373 self.visible_item_counts.push(visible_item_count);
2374 }
2375 if ctrl_desc.is_visible {
2376 visible_item_count += 1;
2377 if ctrl_desc.is_valid_item {
2378 debug_assert!(databuf.len() >= bytes_per_length);
2380 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2381 match out_bits_per_offset {
2382 32 => offsets_data
2383 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2384 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2385 _ => unreachable!(),
2386 };
2387 databuf = &databuf[bytes_per_offset..];
2388 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2389 databuf = &databuf[length as usize..];
2390 current_offset += length;
2391 } else {
2392 match out_bits_per_offset {
2394 32 => offsets_data
2395 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2396 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2397 _ => unreachable!(),
2398 }
2399 }
2400 }
2401 }
2402 }
2403 self.repdef_starts.push(rep.len().max(def.len()));
2404 self.data_starts.push(unzipped_data.len());
2405 self.offset_starts.push(offsets_data.len());
2406 self.visible_item_counts.push(visible_item_count);
2407 match out_bits_per_offset {
2408 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2409 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2410 _ => unreachable!(),
2411 };
2412 self.rep = ScalarBuffer::from(rep);
2413 self.def = ScalarBuffer::from(def);
2414 self.data = LanceBuffer::Owned(unzipped_data);
2415 self.offsets = LanceBuffer::Owned(offsets_data);
2416 }
2417}
2418
2419impl StructuralPageDecoder for VariableFullZipDecoder {
2420 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2421 let start = self.current_idx;
2422 let end = start + num_rows as usize;
2423
2424 let data = self.data.borrow_and_clone();
2432
2433 let offset_start = self.offset_starts[start];
2434 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2435 let offsets = self
2436 .offsets
2437 .slice_with_length(offset_start, offset_end - offset_start);
2438
2439 let repdef_start = self.repdef_starts[start];
2440 let repdef_end = self.repdef_starts[end];
2441 let rep = if self.rep.is_empty() {
2442 self.rep.clone()
2443 } else {
2444 self.rep.slice(repdef_start, repdef_end - repdef_start)
2445 };
2446 let def = if self.def.is_empty() {
2447 self.def.clone()
2448 } else {
2449 self.def.slice(repdef_start, repdef_end - repdef_start)
2450 };
2451
2452 let visible_item_counts_start = self.visible_item_counts[start];
2453 let visible_item_counts_end = self.visible_item_counts[end];
2454 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2455
2456 self.current_idx += num_rows as usize;
2457
2458 Ok(Box::new(VariableFullZipDecodeTask {
2459 details: self.details.clone(),
2460 decompressor: self.decompressor.clone(),
2461 data,
2462 offsets,
2463 bits_per_offset: self.bits_per_offset,
2464 num_visible_items,
2465 rep,
2466 def,
2467 }))
2468 }
2469
2470 fn num_rows(&self) -> u64 {
2471 self.num_rows
2472 }
2473}
2474
2475#[derive(Debug)]
2476struct VariableFullZipDecodeTask {
2477 details: Arc<FullZipDecodeDetails>,
2478 decompressor: Arc<dyn VariablePerValueDecompressor>,
2479 data: LanceBuffer,
2480 offsets: LanceBuffer,
2481 bits_per_offset: u8,
2482 num_visible_items: u64,
2483 rep: ScalarBuffer<u16>,
2484 def: ScalarBuffer<u16>,
2485}
2486
2487impl DecodePageTask for VariableFullZipDecodeTask {
2488 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2489 let block = VariableWidthBlock {
2490 data: self.data,
2491 offsets: self.offsets,
2492 bits_per_offset: self.bits_per_offset,
2493 num_values: self.num_visible_items,
2494 block_info: BlockInfo::new(),
2495 };
2496 let decomopressed = self.decompressor.decompress(block)?;
2497 let rep = self.rep.to_vec();
2498 let def = self.def.to_vec();
2499 let unraveler =
2500 RepDefUnraveler::new(Some(rep), Some(def), self.details.def_meaning.clone());
2501 Ok(DecodedPage {
2502 data: decomopressed,
2503 repdef: unraveler,
2504 })
2505 }
2506}
2507
2508#[derive(Debug)]
2509struct FullZipDecodeTaskItem {
2510 data: PerValueDataBlock,
2511 rows_in_buf: u64,
2512 items_in_buf: u64,
2513}
2514
2515#[derive(Debug)]
2518struct FixedFullZipDecodeTask {
2519 details: Arc<FullZipDecodeDetails>,
2520 data: Vec<FullZipDecodeTaskItem>,
2521 num_items: usize,
2522 bytes_per_value: usize,
2523}
2524
2525impl DecodePageTask for FixedFullZipDecodeTask {
2526 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2527 let estimated_size_bytes = self
2529 .data
2530 .iter()
2531 .map(|task_item| task_item.data.data_size() as usize)
2532 .sum::<usize>()
2533 * 2;
2534 let mut data_builder =
2535 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2536
2537 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2538 for task_item in self.data.into_iter() {
2542 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2543 unreachable!()
2544 };
2545 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2546 else {
2547 unreachable!()
2548 };
2549 debug_assert_eq!(fixed_data.num_values, task_item.items_in_buf);
2550 let decompressed = decompressor.decompress(fixed_data)?;
2551 data_builder.append(&decompressed, 0..task_item.items_in_buf);
2552 }
2553
2554 let unraveler = RepDefUnraveler::new(None, None, self.details.def_meaning.clone());
2555
2556 Ok(DecodedPage {
2557 data: data_builder.finish(),
2558 repdef: unraveler,
2559 })
2560 } else {
2561 let mut rep = Vec::with_capacity(self.num_items);
2563 let mut def = Vec::with_capacity(self.num_items);
2564
2565 for task_item in self.data.into_iter() {
2566 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2567 unreachable!()
2568 };
2569 let mut buf_slice = fixed_data.data.as_ref();
2570 let mut values = Vec::with_capacity(
2573 fixed_data.data.len()
2574 - (self.details.ctrl_word_parser.bytes_per_word()
2575 * task_item.items_in_buf as usize),
2576 );
2577 let mut visible_items = 0;
2578 for _ in 0..task_item.items_in_buf {
2579 self.details
2581 .ctrl_word_parser
2582 .parse(buf_slice, &mut rep, &mut def);
2583 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2584
2585 let is_visible = def
2586 .last()
2587 .map(|d| *d <= self.details.max_visible_def)
2588 .unwrap_or(true);
2589 if is_visible {
2590 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2592 buf_slice = &buf_slice[self.bytes_per_value..];
2593 visible_items += 1;
2594 }
2595 }
2596
2597 let values_buf = LanceBuffer::Owned(values);
2599 let fixed_data = FixedWidthDataBlock {
2600 bits_per_value: self.bytes_per_value as u64 * 8,
2601 block_info: BlockInfo::new(),
2602 data: values_buf,
2603 num_values: visible_items,
2604 };
2605 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2606 else {
2607 unreachable!()
2608 };
2609 let decompressed = decompressor.decompress(fixed_data)?;
2610 data_builder.append(&decompressed, 0..visible_items);
2611 }
2612
2613 let repetition = if rep.is_empty() { None } else { Some(rep) };
2614 let definition = if def.is_empty() { None } else { Some(def) };
2615
2616 let unraveler =
2617 RepDefUnraveler::new(repetition, definition, self.details.def_meaning.clone());
2618 let data = data_builder.finish();
2619
2620 Ok(DecodedPage {
2621 data,
2622 repdef: unraveler,
2623 })
2624 }
2625 }
2626}
2627
2628#[derive(Debug)]
2629struct StructuralPrimitiveFieldSchedulingJob<'a> {
2630 scheduler: &'a StructuralPrimitiveFieldScheduler,
2631 ranges: Vec<Range<u64>>,
2632 page_idx: usize,
2633 range_idx: usize,
2634 range_offset: u64,
2635 global_row_offset: u64,
2636}
2637
2638impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2639 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2640 Self {
2641 scheduler,
2642 ranges,
2643 page_idx: 0,
2644 range_idx: 0,
2645 range_offset: 0,
2646 global_row_offset: 0,
2647 }
2648 }
2649}
2650
2651impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2652 fn schedule_next(
2653 &mut self,
2654 context: &mut SchedulerContext,
2655 ) -> Result<Option<ScheduledScanLine>> {
2656 if self.range_idx >= self.ranges.len() {
2657 return Ok(None);
2658 }
2659 let mut range = self.ranges[self.range_idx].clone();
2661 range.start += self.range_offset;
2662 let priority = range.start;
2663
2664 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2665 trace!(
2666 "Current range is {:?} and current page has {} rows",
2667 range,
2668 cur_page.num_rows
2669 );
2670 while cur_page.num_rows + self.global_row_offset <= range.start {
2672 self.global_row_offset += cur_page.num_rows;
2673 self.page_idx += 1;
2674 trace!("Skipping entire page of {} rows", cur_page.num_rows);
2675 cur_page = &self.scheduler.page_schedulers[self.page_idx];
2676 }
2677
2678 let mut ranges_in_page = Vec::new();
2682 while cur_page.num_rows + self.global_row_offset > range.start {
2683 range.start = range.start.max(self.global_row_offset);
2684 let start_in_page = range.start - self.global_row_offset;
2685 let end_in_page = start_in_page + (range.end - range.start);
2686 let end_in_page = end_in_page.min(cur_page.num_rows);
2687 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2688
2689 ranges_in_page.push(start_in_page..end_in_page);
2690 if last_in_range {
2691 self.range_idx += 1;
2692 if self.range_idx == self.ranges.len() {
2693 break;
2694 }
2695 range = self.ranges[self.range_idx].clone();
2696 } else {
2697 break;
2698 }
2699 }
2700
2701 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
2702 trace!(
2703 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2704 num_rows_in_next,
2705 ranges_in_page.len(),
2706 cur_page.num_rows,
2707 priority,
2708 self.scheduler.column_index,
2709 cur_page.page_index,
2710 );
2711
2712 self.global_row_offset += cur_page.num_rows;
2713 self.page_idx += 1;
2714
2715 let page_decoder = cur_page
2716 .scheduler
2717 .schedule_ranges(&ranges_in_page, context.io())?;
2718
2719 let cur_path = context.current_path();
2720 let page_index = cur_page.page_index;
2721 let unloaded_page = async move {
2722 let page_decoder = page_decoder.await?;
2723 Ok(LoadedPage {
2724 decoder: page_decoder,
2725 path: cur_path,
2726 page_index,
2727 })
2728 }
2729 .boxed();
2730
2731 Ok(Some(ScheduledScanLine {
2732 decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
2733 rows_scheduled: num_rows_in_next,
2734 }))
2735 }
2736}
2737
2738#[derive(Debug)]
2739struct PageInfoAndScheduler {
2740 page_index: usize,
2741 num_rows: u64,
2742 scheduler: Box<dyn StructuralPageScheduler>,
2743}
2744
2745#[derive(Debug)]
2750pub struct StructuralPrimitiveFieldScheduler {
2751 page_schedulers: Vec<PageInfoAndScheduler>,
2752 column_index: u32,
2753}
2754
2755impl StructuralPrimitiveFieldScheduler {
2756 pub fn try_new(
2757 column_info: &ColumnInfo,
2758 items_per_row: u64,
2759 decompressors: &dyn DecompressorStrategy,
2760 ) -> Result<Self> {
2761 let page_schedulers = column_info
2762 .page_infos
2763 .iter()
2764 .enumerate()
2765 .map(|(page_index, page_info)| {
2766 Self::page_info_to_scheduler(
2767 page_info,
2768 page_index,
2769 column_info.index as usize,
2770 decompressors,
2771 items_per_row,
2772 )
2773 })
2774 .collect::<Result<Vec<_>>>()?;
2775 Ok(Self {
2776 page_schedulers,
2777 column_index: column_info.index,
2778 })
2779 }
2780
2781 fn page_info_to_scheduler(
2782 page_info: &PageInfo,
2783 page_index: usize,
2784 _column_index: usize,
2785 decompressors: &dyn DecompressorStrategy,
2786 items_per_row: u64,
2787 ) -> Result<PageInfoAndScheduler> {
2788 let scheduler: Box<dyn StructuralPageScheduler> =
2789 match page_info.encoding.as_structural().layout.as_ref() {
2790 Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
2791 Box::new(MiniBlockScheduler::try_new(
2792 &page_info.buffer_offsets_and_sizes,
2793 page_info.priority,
2794 mini_block.num_items,
2795 items_per_row,
2796 mini_block,
2797 decompressors,
2798 )?)
2799 }
2800 Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
2801 Box::new(FullZipScheduler::try_new(
2802 &page_info.buffer_offsets_and_sizes,
2803 page_info.priority,
2804 page_info.num_rows,
2805 items_per_row,
2806 full_zip,
2807 decompressors,
2808 32,
2809 )?)
2810 }
2811 Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
2812 let def_meaning = all_null
2813 .layers
2814 .iter()
2815 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
2816 .collect::<Vec<_>>();
2817 if def_meaning.len() == 1
2818 && def_meaning[0] == DefinitionInterpretation::NullableItem
2819 {
2820 Box::new(SimpleAllNullScheduler::default())
2821 as Box<dyn StructuralPageScheduler>
2822 } else {
2823 Box::new(ComplexAllNullScheduler::new(
2824 page_info.buffer_offsets_and_sizes.clone(),
2825 def_meaning.into(),
2826 items_per_row,
2827 )) as Box<dyn StructuralPageScheduler>
2828 }
2829 }
2830 _ => todo!(),
2831 };
2832 Ok(PageInfoAndScheduler {
2833 page_index,
2834 num_rows: page_info.num_rows,
2835 scheduler,
2836 })
2837 }
2838}
2839
2840pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
2841 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
2842}
2843
2844pub struct NoCachedPageData;
2845
2846impl DeepSizeOf for NoCachedPageData {
2847 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
2848 0
2849 }
2850}
2851impl CachedPageData for NoCachedPageData {
2852 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2853 self
2854 }
2855}
2856
2857pub struct CachedFieldData {
2858 pages: Vec<Arc<dyn CachedPageData>>,
2859}
2860
2861impl DeepSizeOf for CachedFieldData {
2862 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
2863 self.pages.deep_size_of_children(ctx)
2864 }
2865}
2866
2867impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
2868 fn initialize<'a>(
2869 &'a mut self,
2870 _filter: &'a FilterExpression,
2871 context: &'a SchedulerContext,
2872 ) -> BoxFuture<'a, Result<()>> {
2873 let cache_key = self.column_index.to_string();
2874 if let Some(cached_data) = context.cache().get_by_str::<CachedFieldData>(&cache_key) {
2875 self.page_schedulers
2876 .iter_mut()
2877 .zip(cached_data.pages.iter())
2878 .for_each(|(page_scheduler, cached_data)| {
2879 page_scheduler.scheduler.load(cached_data);
2880 });
2881 return std::future::ready(Ok(())).boxed();
2882 };
2883
2884 let cache = context.cache().clone();
2885 let page_data = self
2886 .page_schedulers
2887 .iter_mut()
2888 .map(|s| s.scheduler.initialize(context.io()))
2889 .collect::<FuturesUnordered<_>>();
2890
2891 async move {
2892 let page_data = page_data.try_collect::<Vec<_>>().await?;
2893 let cached_data = Arc::new(CachedFieldData { pages: page_data });
2894 cache.insert_by_str::<CachedFieldData>(&cache_key, cached_data);
2895 Ok(())
2896 }
2897 .boxed()
2898 }
2899
2900 fn schedule_ranges<'a>(
2901 &'a self,
2902 ranges: &[Range<u64>],
2903 _filter: &FilterExpression,
2904 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
2905 let ranges = ranges.to_vec();
2906 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
2907 self, ranges,
2908 )))
2909 }
2910}
2911
2912pub struct PrimitiveFieldDecoder {
2913 data_type: DataType,
2914 unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
2915 physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
2916 should_validate: bool,
2917 num_rows: u64,
2918 rows_drained: u64,
2919 column_index: u32,
2920 page_index: u32,
2921}
2922
2923impl PrimitiveFieldDecoder {
2924 pub fn new_from_data(
2925 physical_decoder: Arc<dyn PrimitivePageDecoder>,
2926 data_type: DataType,
2927 num_rows: u64,
2928 should_validate: bool,
2929 ) -> Self {
2930 Self {
2931 data_type,
2932 unloaded_physical_decoder: None,
2933 physical_decoder: Some(physical_decoder),
2934 should_validate,
2935 num_rows,
2936 rows_drained: 0,
2937 column_index: u32::MAX,
2938 page_index: u32::MAX,
2939 }
2940 }
2941}
2942
2943impl Debug for PrimitiveFieldDecoder {
2944 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2945 f.debug_struct("PrimitiveFieldDecoder")
2946 .field("data_type", &self.data_type)
2947 .field("num_rows", &self.num_rows)
2948 .field("rows_drained", &self.rows_drained)
2949 .finish()
2950 }
2951}
2952
2953struct PrimitiveFieldDecodeTask {
2954 rows_to_skip: u64,
2955 rows_to_take: u64,
2956 should_validate: bool,
2957 physical_decoder: Arc<dyn PrimitivePageDecoder>,
2958 data_type: DataType,
2959}
2960
2961impl DecodeArrayTask for PrimitiveFieldDecodeTask {
2962 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2963 let block = self
2964 .physical_decoder
2965 .decode(self.rows_to_skip, self.rows_to_take)?;
2966
2967 let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
2968
2969 if let DataType::Dictionary(_, _) = self.data_type {
2976 let dict = array.as_any_dictionary();
2977 if let Some(nulls) = array.logical_nulls() {
2978 let new_indices = dict.keys().to_data();
2979 let new_array = make_array(
2980 new_indices
2981 .into_builder()
2982 .nulls(Some(nulls))
2983 .add_child_data(dict.values().to_data())
2984 .data_type(dict.data_type().clone())
2985 .build()?,
2986 );
2987 return Ok(new_array);
2988 }
2989 }
2990 Ok(array)
2991 }
2992}
2993
2994impl LogicalPageDecoder for PrimitiveFieldDecoder {
2995 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
2998 log::trace!(
2999 "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
3000 loaded_need,
3001 self.column_index,
3002 self.page_index,
3003 self.num_rows
3004 );
3005 async move {
3006 let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
3007 self.physical_decoder = Some(Arc::from(physical_decoder));
3008 Ok(())
3009 }
3010 .boxed()
3011 }
3012
3013 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
3014 if self.physical_decoder.as_ref().is_none() {
3015 return Err(lance_core::Error::Internal {
3016 message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
3017 location: location!(),
3018 });
3019 }
3020
3021 let rows_to_skip = self.rows_drained;
3022 let rows_to_take = num_rows;
3023
3024 self.rows_drained += rows_to_take;
3025
3026 let task = Box::new(PrimitiveFieldDecodeTask {
3027 rows_to_skip,
3028 rows_to_take,
3029 should_validate: self.should_validate,
3030 physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
3031 data_type: self.data_type.clone(),
3032 });
3033
3034 Ok(NextDecodeTask {
3035 task,
3036 num_rows: rows_to_take,
3037 })
3038 }
3039
3040 fn rows_loaded(&self) -> u64 {
3041 if self.unloaded_physical_decoder.is_some() {
3042 0
3043 } else {
3044 self.num_rows
3045 }
3046 }
3047
3048 fn rows_drained(&self) -> u64 {
3049 if self.unloaded_physical_decoder.is_some() {
3050 0
3051 } else {
3052 self.rows_drained
3053 }
3054 }
3055
3056 fn num_rows(&self) -> u64 {
3057 self.num_rows
3058 }
3059
3060 fn data_type(&self) -> &DataType {
3061 &self.data_type
3062 }
3063}
3064
3065#[derive(Debug)]
3068pub struct StructuralCompositeDecodeArrayTask {
3069 tasks: Vec<Box<dyn DecodePageTask>>,
3070 items_type: DataType,
3071 fsl_fields: Arc<[Arc<ArrowField>]>,
3072 should_validate: bool,
3073}
3074
3075impl StructuralCompositeDecodeArrayTask {
3076 fn restore_validity(
3077 array: Arc<dyn Array>,
3078 unraveler: &mut CompositeRepDefUnraveler,
3079 ) -> Arc<dyn Array> {
3080 let validity = unraveler.unravel_validity(array.len());
3081 let Some(validity) = validity else {
3082 return array;
3083 };
3084 if array.data_type() == &DataType::Null {
3085 return array;
3087 }
3088 assert_eq!(validity.len(), array.len());
3089 make_array(unsafe {
3092 array
3093 .to_data()
3094 .into_builder()
3095 .nulls(Some(validity))
3096 .build_unchecked()
3097 })
3098 }
3099
3100 fn restore_fsl(
3101 array: Arc<dyn Array>,
3102 unraveler: &mut CompositeRepDefUnraveler,
3103 fsl_fields: Arc<[Arc<ArrowField>]>,
3104 ) -> Arc<dyn Array> {
3105 let mut array = array;
3106 for fsl_field in fsl_fields.iter().rev() {
3107 let DataType::FixedSizeList(child_field, dimension) = fsl_field.data_type() else {
3108 unreachable!()
3109 };
3110 let fsl_num_values = array.len() / *dimension as usize;
3111 let fsl_validity = unraveler.unravel_fsl_validity(fsl_num_values, *dimension as usize);
3112 array = Arc::new(FixedSizeListArray::new(
3113 child_field.clone(),
3114 *dimension,
3115 array,
3116 fsl_validity,
3117 ));
3118 }
3119 array
3120 }
3121}
3122
3123impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3124 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3125 let mut arrays = Vec::with_capacity(self.tasks.len());
3126 let mut unravelers = Vec::with_capacity(self.tasks.len());
3127 for task in self.tasks {
3128 let decoded = task.decode()?;
3129 unravelers.push(decoded.repdef);
3130
3131 let array = make_array(
3132 decoded
3133 .data
3134 .into_arrow(self.items_type.clone(), self.should_validate)?,
3135 );
3136
3137 arrays.push(array);
3138 }
3139 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3140 let array = arrow_select::concat::concat(&array_refs)?;
3141 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3142
3143 let array = Self::restore_validity(array, &mut repdef);
3144 let array = Self::restore_fsl(array, &mut repdef, self.fsl_fields);
3145
3146 Ok(DecodedArray { array, repdef })
3147 }
3148}
3149
3150#[derive(Debug)]
3151pub struct StructuralPrimitiveFieldDecoder {
3152 field: Arc<ArrowField>,
3153 items_type: DataType,
3154 fsl_fields: Arc<[Arc<ArrowField>]>,
3155 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3156 should_validate: bool,
3157 rows_drained_in_current: u64,
3158}
3159
3160impl StructuralPrimitiveFieldDecoder {
3161 fn flatten_field_helper(
3162 field: &Arc<ArrowField>,
3163 mut fields: Vec<Arc<ArrowField>>,
3164 ) -> (Arc<[Arc<ArrowField>]>, &DataType) {
3165 match field.data_type() {
3166 DataType::FixedSizeList(inner, _) => {
3167 fields.push(field.clone());
3168 Self::flatten_field_helper(inner, fields)
3169 }
3170 _ => {
3171 let fields = fields.into();
3172 (fields, field.data_type())
3173 }
3174 }
3175 }
3176
3177 fn flatten_field(field: &Arc<ArrowField>) -> (Arc<[Arc<ArrowField>]>, &DataType) {
3178 Self::flatten_field_helper(field, Vec::default())
3179 }
3180
3181 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3182 let (fsl_fields, items_type) = Self::flatten_field(field);
3183 Self {
3184 field: field.clone(),
3185 items_type: items_type.clone(),
3186 fsl_fields,
3187 page_decoders: VecDeque::new(),
3188 should_validate,
3189 rows_drained_in_current: 0,
3190 }
3191 }
3192}
3193
3194impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3195 fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
3196 assert!(child.path.is_empty());
3197 self.page_decoders.push_back(child.decoder);
3198 Ok(())
3199 }
3200
3201 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3202 let mut remaining = num_rows;
3203 let mut tasks = Vec::new();
3204 while remaining > 0 {
3205 let cur_page = self.page_decoders.front_mut().unwrap();
3206 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3207 let to_take = num_in_page.min(remaining);
3208
3209 let task = cur_page.drain(to_take)?;
3210 tasks.push(task);
3211
3212 if to_take == num_in_page {
3213 self.page_decoders.pop_front();
3214 self.rows_drained_in_current = 0;
3215 } else {
3216 self.rows_drained_in_current += to_take;
3217 }
3218
3219 remaining -= to_take;
3220 }
3221 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3222 tasks,
3223 items_type: self.items_type.clone(),
3224 should_validate: self.should_validate,
3225 fsl_fields: self.fsl_fields.clone(),
3226 }))
3227 }
3228
3229 fn data_type(&self) -> &DataType {
3230 self.field.data_type()
3231 }
3232}
3233
3234#[derive(Debug)]
3235pub struct AccumulationQueue {
3236 cache_bytes: u64,
3237 keep_original_array: bool,
3238 buffered_arrays: Vec<ArrayRef>,
3239 current_bytes: u64,
3240 row_number: u64,
3242 num_rows: u64,
3244 column_index: u32,
3246}
3247
3248impl AccumulationQueue {
3249 pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
3250 Self {
3251 cache_bytes,
3252 buffered_arrays: Vec::new(),
3253 current_bytes: 0,
3254 column_index,
3255 keep_original_array,
3256 row_number: u64::MAX,
3257 num_rows: 0,
3258 }
3259 }
3260
3261 pub fn insert(
3264 &mut self,
3265 array: ArrayRef,
3266 row_number: u64,
3267 num_rows: u64,
3268 ) -> Option<(Vec<ArrayRef>, u64, u64)> {
3269 if self.row_number == u64::MAX {
3270 self.row_number = row_number;
3271 }
3272 self.num_rows += num_rows;
3273 self.current_bytes += array.get_array_memory_size() as u64;
3274 if self.current_bytes > self.cache_bytes {
3275 debug!(
3276 "Flushing column {} page of size {} bytes (unencoded)",
3277 self.column_index, self.current_bytes
3278 );
3279 self.buffered_arrays.push(array);
3281 self.current_bytes = 0;
3282 let row_number = self.row_number;
3283 self.row_number = u64::MAX;
3284 let num_rows = self.num_rows;
3285 self.num_rows = 0;
3286 Some((
3287 std::mem::take(&mut self.buffered_arrays),
3288 row_number,
3289 num_rows,
3290 ))
3291 } else {
3292 trace!(
3293 "Accumulating data for column {}. Now at {} bytes",
3294 self.column_index,
3295 self.current_bytes
3296 );
3297 if self.keep_original_array {
3298 self.buffered_arrays.push(array);
3299 } else {
3300 self.buffered_arrays.push(deep_copy_array(array.as_ref()))
3301 }
3302 None
3303 }
3304 }
3305
3306 pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
3307 if self.buffered_arrays.is_empty() {
3308 trace!(
3309 "No final flush since no data at column {}",
3310 self.column_index
3311 );
3312 None
3313 } else {
3314 trace!(
3315 "Final flush of column {} which has {} bytes",
3316 self.column_index,
3317 self.current_bytes
3318 );
3319 self.current_bytes = 0;
3320 let row_number = self.row_number;
3321 self.row_number = u64::MAX;
3322 let num_rows = self.num_rows;
3323 self.num_rows = 0;
3324 Some((
3325 std::mem::take(&mut self.buffered_arrays),
3326 row_number,
3327 num_rows,
3328 ))
3329 }
3330 }
3331}
3332
3333pub struct PrimitiveFieldEncoder {
3334 accumulation_queue: AccumulationQueue,
3335 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
3336 column_index: u32,
3337 field: Field,
3338 max_page_bytes: u64,
3339}
3340
3341impl PrimitiveFieldEncoder {
3342 pub fn try_new(
3343 options: &EncodingOptions,
3344 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
3345 column_index: u32,
3346 field: Field,
3347 ) -> Result<Self> {
3348 Ok(Self {
3349 accumulation_queue: AccumulationQueue::new(
3350 options.cache_bytes_per_column,
3351 column_index,
3352 options.keep_original_array,
3353 ),
3354 column_index,
3355 max_page_bytes: options.max_page_bytes,
3356 array_encoding_strategy,
3357 field,
3358 })
3359 }
3360
3361 fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
3362 let encoder = self
3363 .array_encoding_strategy
3364 .create_array_encoder(&arrays, &self.field)?;
3365 let column_idx = self.column_index;
3366 let data_type = self.field.data_type();
3367
3368 Ok(tokio::task::spawn(async move {
3369 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
3370 let data = DataBlock::from_arrays(&arrays, num_values);
3371 let mut buffer_index = 0;
3372 let array = encoder.encode(data, &data_type, &mut buffer_index)?;
3373 let (data, description) = array.into_buffers();
3374 Ok(EncodedPage {
3375 data,
3376 description: PageEncoding::Legacy(description),
3377 num_rows: num_values,
3378 column_idx,
3379 row_number: 0, })
3381 })
3382 .map(|res_res| res_res.unwrap())
3383 .boxed())
3384 }
3385
3386 fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
3388 if arrays.len() == 1 {
3389 let array = arrays.into_iter().next().unwrap();
3390 let size_bytes = array.get_buffer_memory_size();
3391 let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
3392 let num_parts = num_parts.min(array.len());
3394 if num_parts <= 1 {
3395 Ok(vec![self.create_encode_task(vec![array])?])
3397 } else {
3398 let mut tasks = Vec::with_capacity(num_parts);
3403 let mut offset = 0;
3404 let part_size = bit_util::ceil(array.len(), num_parts);
3405 for _ in 0..num_parts {
3406 let avail = array.len() - offset;
3407 let chunk_size = avail.min(part_size);
3408 let part = array.slice(offset, chunk_size);
3409 let task = self.create_encode_task(vec![part])?;
3410 tasks.push(task);
3411 offset += chunk_size;
3412 }
3413 Ok(tasks)
3414 }
3415 } else {
3416 Ok(vec![self.create_encode_task(arrays)?])
3420 }
3421 }
3422}
3423
3424impl FieldEncoder for PrimitiveFieldEncoder {
3425 fn maybe_encode(
3427 &mut self,
3428 array: ArrayRef,
3429 _external_buffers: &mut OutOfLineBuffers,
3430 _repdef: RepDefBuilder,
3431 row_number: u64,
3432 num_rows: u64,
3433 ) -> Result<Vec<EncodeTask>> {
3434 if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
3435 Ok(self.do_flush(arrays.0)?)
3436 } else {
3437 Ok(vec![])
3438 }
3439 }
3440
3441 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
3443 if let Some(arrays) = self.accumulation_queue.flush() {
3444 Ok(self.do_flush(arrays.0)?)
3445 } else {
3446 Ok(vec![])
3447 }
3448 }
3449
3450 fn num_columns(&self) -> u32 {
3451 1
3452 }
3453
3454 fn finish(
3455 &mut self,
3456 _external_buffers: &mut OutOfLineBuffers,
3457 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
3458 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
3459 }
3460}
3461
3462struct SerializedFullZip {
3464 values: LanceBuffer,
3466 repetition_index: Option<LanceBuffer>,
3468}
3469
3470const MINIBLOCK_ALIGNMENT: usize = 8;
3490const MINIBLOCK_MAX_PADDING: usize = MINIBLOCK_ALIGNMENT - 1;
3491
3492pub struct PrimitiveStructuralEncoder {
3519 accumulation_queue: AccumulationQueue,
3521 accumulated_repdefs: Vec<RepDefBuilder>,
3522 compression_strategy: Arc<dyn CompressionStrategy>,
3524 column_index: u32,
3525 field: Field,
3526 encoding_metadata: Arc<HashMap<String, String>>,
3527}
3528
3529impl PrimitiveStructuralEncoder {
3530 pub fn try_new(
3531 options: &EncodingOptions,
3532 compression_strategy: Arc<dyn CompressionStrategy>,
3533 column_index: u32,
3534 field: Field,
3535 encoding_metadata: Arc<HashMap<String, String>>,
3536 ) -> Result<Self> {
3537 Ok(Self {
3538 accumulation_queue: AccumulationQueue::new(
3539 options.cache_bytes_per_column,
3540 column_index,
3541 options.keep_original_array,
3542 ),
3543 accumulated_repdefs: Vec::new(),
3544 column_index,
3545 compression_strategy,
3546 field,
3547 encoding_metadata,
3548 })
3549 }
3550
3551 fn is_narrow(data_block: &DataBlock) -> bool {
3559 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3560
3561 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3562 let max_len_array = max_len_array
3563 .as_any()
3564 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3565 .unwrap();
3566 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3567 return true;
3568 }
3569 }
3570 false
3571 }
3572
3573 fn prefers_miniblock(
3574 data_block: &DataBlock,
3575 encoding_metadata: &HashMap<String, String>,
3576 ) -> bool {
3577 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3579 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3580 }
3581 Self::is_narrow(data_block)
3583 }
3584
3585 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3586 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3590 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3591 }
3592 true
3593 }
3594
3595 fn serialize_miniblocks(
3649 miniblocks: MiniBlockCompressed,
3650 rep: Vec<LanceBuffer>,
3651 def: Vec<LanceBuffer>,
3652 ) -> (LanceBuffer, LanceBuffer) {
3653 let bytes_rep = rep.iter().map(|r| r.len()).sum::<usize>();
3654 let bytes_def = def.iter().map(|d| d.len()).sum::<usize>();
3655 let max_bytes_repdef_len = rep.len() * 4;
3656 let max_padding = miniblocks.chunks.len() * (1 + (2 * MINIBLOCK_MAX_PADDING));
3657 let mut data_buffer = Vec::with_capacity(
3658 miniblocks.data.len() + bytes_rep + bytes_def + max_bytes_repdef_len + max_padding, );
3664 let mut meta_buffer = Vec::with_capacity(miniblocks.data.len() * 2);
3665
3666 let mut value_offset = 0;
3667 for ((chunk, rep), def) in miniblocks.chunks.into_iter().zip(rep).zip(def) {
3668 let start_len = data_buffer.len();
3669 debug_assert_eq!(start_len % MINIBLOCK_ALIGNMENT, 0);
3671
3672 assert!(rep.len() < u16::MAX as usize);
3673 assert!(def.len() < u16::MAX as usize);
3674 let bytes_rep = rep.len() as u16;
3675 let bytes_def = def.len() as u16;
3676 let bytes_val = chunk.num_bytes;
3677
3678 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3681 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3682 data_buffer.extend_from_slice(&bytes_val.to_le_bytes());
3683
3684 data_buffer.extend_from_slice(&rep);
3685 debug_assert_eq!(data_buffer.len() % 2, 0);
3688 data_buffer.extend_from_slice(&def);
3689
3690 let p2 = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3691 data_buffer.extend(iter::repeat(0).take(p2));
3693
3694 let num_value_bytes = chunk.num_bytes as usize;
3695 let values =
3696 &miniblocks.data[value_offset as usize..value_offset as usize + num_value_bytes];
3697 debug_assert_eq!(data_buffer.len() % MINIBLOCK_ALIGNMENT, 0);
3698 data_buffer.extend_from_slice(values);
3699
3700 let p3 = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3701 data_buffer.extend(iter::repeat(0).take(p3));
3702 value_offset += num_value_bytes as u64;
3703
3704 let chunk_bytes = data_buffer.len() - start_len;
3705 assert!(chunk_bytes <= 16 * 1024);
3706 assert!(chunk_bytes > 0);
3707 assert_eq!(chunk_bytes % 8, 0);
3708 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3712 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3713
3714 let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3715 meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3716 }
3717
3718 (
3719 LanceBuffer::Owned(data_buffer),
3720 LanceBuffer::Owned(meta_buffer),
3721 )
3722 }
3723
3724 fn compress_levels(
3731 levels: Option<RepDefSlicer<'_>>,
3732 num_values: u64,
3733 compression_strategy: &dyn CompressionStrategy,
3734 chunks: &[MiniBlockChunk],
3735 max_rep: u16,
3737 ) -> Result<(Vec<LanceBuffer>, pb::ArrayEncoding, LanceBuffer)> {
3738 if let Some(mut levels) = levels {
3739 let mut rep_index = if max_rep > 0 {
3740 Vec::with_capacity(chunks.len())
3741 } else {
3742 vec![]
3743 };
3744 let num_levels = levels.num_levels() as u64;
3746 let mut levels_buf = levels.all_levels().try_clone().unwrap();
3747 let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3748 data: levels_buf.borrow_and_clone(),
3749 bits_per_value: 16,
3750 num_values: num_levels,
3751 block_info: BlockInfo::new(),
3752 });
3753 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3754 let (compressor, compressor_desc) =
3756 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3757 let mut buffers = Vec::with_capacity(chunks.len());
3759 let mut values_counter = 0;
3760 for (chunk_idx, chunk) in chunks.iter().enumerate() {
3761 let chunk_num_values = chunk.num_values(values_counter, num_values);
3762 values_counter += chunk_num_values;
3763 let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
3764 levels.slice_next(chunk_num_values as usize)
3765 } else {
3766 levels.slice_rest()
3767 };
3768 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3769 if max_rep > 0 {
3770 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3780 let rep_values = rep_values.as_ref();
3781
3782 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3785 let num_leftovers = if chunk_idx < chunks.len() - 1 {
3786 rep_values
3787 .iter()
3788 .rev()
3789 .position(|v| *v == max_rep)
3790 .map(|pos| pos + 1)
3792 .unwrap_or(rep_values.len())
3793 } else {
3794 0
3796 };
3797
3798 if chunk_idx != 0 && rep_values[0] == max_rep {
3799 let rep_len = rep_index.len();
3803 if rep_index[rep_len - 1] != 0 {
3804 rep_index[rep_len - 2] += 1;
3806 rep_index[rep_len - 1] = 0;
3807 }
3808 }
3809
3810 if chunk_idx == chunks.len() - 1 {
3811 num_rows += 1;
3813 }
3814 rep_index.push(num_rows as u64);
3815 rep_index.push(num_leftovers as u64);
3816 }
3817 let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3818 data: chunk_levels,
3819 bits_per_value: 16,
3820 num_values: num_chunk_levels,
3821 block_info: BlockInfo::new(),
3822 });
3823 let compressed_levels = compressor.compress(chunk_levels_block)?;
3824 buffers.push(compressed_levels);
3825 }
3826 debug_assert_eq!(levels.num_levels_remaining(), 0);
3827 let rep_index = LanceBuffer::reinterpret_vec(rep_index);
3828 Ok((buffers, compressor_desc, rep_index))
3829 } else {
3830 let data = chunks.iter().map(|_| LanceBuffer::empty()).collect();
3833 let scalar = 0_u16.to_le_bytes().to_vec();
3834 let encoding = ProtobufUtils::constant(scalar, num_values);
3835 Ok((data, encoding, LanceBuffer::empty()))
3836 }
3837 }
3838
3839 fn encode_simple_all_null(
3840 column_idx: u32,
3841 num_rows: u64,
3842 row_number: u64,
3843 ) -> Result<EncodedPage> {
3844 let description = ProtobufUtils::simple_all_null_layout();
3845 Ok(EncodedPage {
3846 column_idx,
3847 data: vec![],
3848 description: PageEncoding::Structural(description),
3849 num_rows,
3850 row_number,
3851 })
3852 }
3853
3854 fn encode_complex_all_null(
3858 column_idx: u32,
3859 repdefs: Vec<RepDefBuilder>,
3860 row_number: u64,
3861 num_rows: u64,
3862 ) -> Result<EncodedPage> {
3863 let repdef = RepDefBuilder::serialize(repdefs);
3864
3865 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3867 LanceBuffer::reinterpret_slice(rep.clone())
3868 } else {
3869 LanceBuffer::empty()
3870 };
3871
3872 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3873 LanceBuffer::reinterpret_slice(def.clone())
3874 } else {
3875 LanceBuffer::empty()
3876 };
3877
3878 let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
3879 Ok(EncodedPage {
3880 column_idx,
3881 data: vec![rep_bytes, def_bytes],
3882 description: PageEncoding::Structural(description),
3883 num_rows,
3884 row_number,
3885 })
3886 }
3887
3888 #[allow(clippy::too_many_arguments)]
3889 fn encode_miniblock(
3890 column_idx: u32,
3891 field: &Field,
3892 compression_strategy: &dyn CompressionStrategy,
3893 data: DataBlock,
3894 repdefs: Vec<RepDefBuilder>,
3895 row_number: u64,
3896 dictionary_data: Option<DataBlock>,
3897 num_rows: u64,
3898 ) -> Result<EncodedPage> {
3899 let repdef = RepDefBuilder::serialize(repdefs);
3900
3901 if let DataBlock::AllNull(_null_block) = data {
3902 todo!()
3905 }
3906
3907 let data = data.remove_validity();
3909
3910 let data = data.flatten();
3913
3914 let num_items = data.num_values();
3915
3916 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3917 let (compressed_data, value_encoding) = compressor.compress(data)?;
3918
3919 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3920
3921 let (compressed_rep, rep_encoding, rep_index) = Self::compress_levels(
3922 repdef.rep_slicer(),
3923 num_items,
3924 compression_strategy,
3925 &compressed_data.chunks,
3926 max_rep,
3927 )?;
3928
3929 let (rep_index, rep_index_depth) = if rep_index.is_empty() {
3930 (None, 0)
3931 } else {
3932 (Some(rep_index), 1)
3934 };
3935
3936 let (compressed_def, def_encoding, _) = Self::compress_levels(
3937 repdef.def_slicer(),
3938 num_items,
3939 compression_strategy,
3940 &compressed_data.chunks,
3941 0,
3942 )?;
3943
3944 let (block_value_buffer, block_meta_buffer) =
3950 Self::serialize_miniblocks(compressed_data, compressed_rep, compressed_def);
3951
3952 let mut data = Vec::with_capacity(4);
3954 data.push(block_meta_buffer);
3955 data.push(block_value_buffer);
3956
3957 if let Some(dictionary_data) = dictionary_data {
3958 let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3960
3961 let (compressor, dictionary_encoding) = compression_strategy
3962 .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3963 let dictionary_buffer = compressor.compress(dictionary_data)?;
3964
3965 data.push(dictionary_buffer);
3966 if let Some(rep_index) = rep_index {
3967 data.push(rep_index);
3968 }
3969
3970 let description = ProtobufUtils::miniblock_layout(
3971 rep_encoding,
3972 def_encoding,
3973 value_encoding,
3974 rep_index_depth,
3975 Some(dictionary_encoding),
3976 &repdef.def_meaning,
3977 num_items,
3978 );
3979 Ok(EncodedPage {
3980 num_rows,
3981 column_idx,
3982 data,
3983 description: PageEncoding::Structural(description),
3984 row_number,
3985 })
3986 } else {
3987 let description = ProtobufUtils::miniblock_layout(
3988 rep_encoding,
3989 def_encoding,
3990 value_encoding,
3991 rep_index_depth,
3992 None,
3993 &repdef.def_meaning,
3994 num_items,
3995 );
3996
3997 if let Some(mut rep_index) = rep_index {
3998 let view = rep_index.borrow_to_typed_slice::<u64>();
3999 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4000 debug_assert_eq!(total, num_rows);
4001
4002 data.push(rep_index);
4003 }
4004
4005 Ok(EncodedPage {
4006 num_rows,
4007 column_idx,
4008 data,
4009 description: PageEncoding::Structural(description),
4010 row_number,
4011 })
4012 }
4013 }
4014
4015 fn serialize_full_zip_fixed(
4017 fixed: FixedWidthDataBlock,
4018 mut repdef: ControlWordIterator,
4019 num_items: u64,
4020 ) -> SerializedFullZip {
4021 let len = fixed.data.len() + repdef.bytes_per_word() * num_items as usize;
4022 let mut zipped_data = Vec::with_capacity(len);
4023
4024 let max_rep_index_val = if repdef.has_repetition() {
4025 len as u64
4026 } else {
4027 0
4029 };
4030 let mut rep_index_builder =
4031 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4032
4033 assert_eq!(
4036 fixed.bits_per_value % 8,
4037 0,
4038 "Non-byte aligned full-zip compression not yet supported"
4039 );
4040
4041 let bytes_per_value = fixed.bits_per_value as usize / 8;
4042
4043 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4044 let mut offset = 0;
4045 while let Some(control) = repdef.append_next(&mut zipped_data) {
4046 if control.is_new_row {
4047 debug_assert!(offset <= len);
4049 unsafe { rep_index_builder.append(offset as u64) };
4051 }
4052 if control.is_visible {
4053 let value = data_iter.next().unwrap();
4054 zipped_data.extend_from_slice(value);
4055 }
4056 offset = zipped_data.len();
4057 }
4058
4059 debug_assert_eq!(zipped_data.len(), len);
4060 unsafe {
4063 rep_index_builder.append(zipped_data.len() as u64);
4064 }
4065
4066 let zipped_data = LanceBuffer::Owned(zipped_data);
4067 let rep_index = rep_index_builder.into_data();
4068 let rep_index = if rep_index.is_empty() {
4069 None
4070 } else {
4071 Some(LanceBuffer::Owned(rep_index))
4072 };
4073 SerializedFullZip {
4074 values: zipped_data,
4075 repetition_index: rep_index,
4076 }
4077 }
4078
4079 fn serialize_full_zip_variable(
4083 mut variable: VariableWidthBlock,
4084 mut repdef: ControlWordIterator,
4085 num_items: u64,
4086 ) -> SerializedFullZip {
4087 let bytes_per_offset = variable.bits_per_offset as usize / 8;
4088 assert_eq!(
4089 variable.bits_per_offset % 8,
4090 0,
4091 "Only byte-aligned offsets supported"
4092 );
4093 let len = variable.data.len()
4094 + repdef.bytes_per_word() * num_items as usize
4095 + bytes_per_offset * variable.num_values as usize;
4096 let mut buf = Vec::with_capacity(len);
4097
4098 let max_rep_index_val = len as u64;
4099 let mut rep_index_builder =
4100 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4101
4102 match bytes_per_offset {
4104 4 => {
4105 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4106 let mut rep_offset = 0;
4107 let mut windows_iter = offs.as_ref().windows(2);
4108 while let Some(control) = repdef.append_next(&mut buf) {
4109 if control.is_new_row {
4110 debug_assert!(rep_offset <= len);
4112 unsafe { rep_index_builder.append(rep_offset as u64) };
4114 }
4115 if control.is_visible {
4116 let window = windows_iter.next().unwrap();
4117 if control.is_valid_item {
4118 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4119 buf.extend_from_slice(
4120 &variable.data[window[0] as usize..window[1] as usize],
4121 );
4122 }
4123 }
4124 rep_offset = buf.len();
4125 }
4126 }
4127 8 => {
4128 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4129 let mut rep_offset = 0;
4130 let mut windows_iter = offs.as_ref().windows(2);
4131 while let Some(control) = repdef.append_next(&mut buf) {
4132 if control.is_new_row {
4133 debug_assert!(rep_offset <= len);
4135 unsafe { rep_index_builder.append(rep_offset as u64) };
4137 }
4138 if control.is_visible {
4139 let window = windows_iter.next().unwrap();
4140 if control.is_valid_item {
4141 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4142 buf.extend_from_slice(
4143 &variable.data[window[0] as usize..window[1] as usize],
4144 );
4145 }
4146 }
4147 rep_offset = buf.len();
4148 }
4149 }
4150 _ => panic!("Unsupported offset size"),
4151 }
4152
4153 debug_assert!(buf.len() <= len);
4156 unsafe {
4159 rep_index_builder.append(buf.len() as u64);
4160 }
4161
4162 let zipped_data = LanceBuffer::Owned(buf);
4163 let rep_index = rep_index_builder.into_data();
4164 debug_assert!(!rep_index.is_empty());
4165 let rep_index = Some(LanceBuffer::Owned(rep_index));
4166 SerializedFullZip {
4167 values: zipped_data,
4168 repetition_index: rep_index,
4169 }
4170 }
4171
4172 fn serialize_full_zip(
4175 compressed_data: PerValueDataBlock,
4176 repdef: ControlWordIterator,
4177 num_items: u64,
4178 ) -> SerializedFullZip {
4179 match compressed_data {
4180 PerValueDataBlock::Fixed(fixed) => {
4181 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4182 }
4183 PerValueDataBlock::Variable(var) => {
4184 Self::serialize_full_zip_variable(var, repdef, num_items)
4185 }
4186 }
4187 }
4188
4189 fn encode_full_zip(
4190 column_idx: u32,
4191 field: &Field,
4192 compression_strategy: &dyn CompressionStrategy,
4193 data: DataBlock,
4194 repdefs: Vec<RepDefBuilder>,
4195 row_number: u64,
4196 num_lists: u64,
4197 ) -> Result<EncodedPage> {
4198 let repdef = RepDefBuilder::serialize(repdefs);
4199 let max_rep = repdef
4200 .repetition_levels
4201 .as_ref()
4202 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4203 let max_def = repdef
4204 .definition_levels
4205 .as_ref()
4206 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4207
4208 let data = data.remove_validity();
4210
4211 let data = data.flatten();
4213 let (num_items, num_visible_items) =
4214 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4215 (rep_levels.len() as u64, data.num_values())
4218 } else {
4219 (data.num_values(), data.num_values())
4221 };
4222
4223 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4224
4225 let repdef_iter = build_control_word_iterator(
4226 repdef.repetition_levels.as_deref(),
4227 max_rep,
4228 repdef.definition_levels.as_deref(),
4229 max_def,
4230 max_visible_def,
4231 num_items as usize,
4232 );
4233 let bits_rep = repdef_iter.bits_rep();
4234 let bits_def = repdef_iter.bits_def();
4235
4236 let compressor = compression_strategy.create_per_value(field, &data)?;
4237 let (compressed_data, value_encoding) = compressor.compress(data)?;
4238
4239 let description = match &compressed_data {
4240 PerValueDataBlock::Fixed(fixed) => ProtobufUtils::fixed_full_zip_layout(
4241 bits_rep,
4242 bits_def,
4243 fixed.bits_per_value as u32,
4244 value_encoding,
4245 &repdef.def_meaning,
4246 num_items as u32,
4247 num_visible_items as u32,
4248 ),
4249 PerValueDataBlock::Variable(variable) => ProtobufUtils::variable_full_zip_layout(
4250 bits_rep,
4251 bits_def,
4252 variable.bits_per_offset as u32,
4253 value_encoding,
4254 &repdef.def_meaning,
4255 num_items as u32,
4256 num_visible_items as u32,
4257 ),
4258 };
4259
4260 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4261
4262 let data = if let Some(repindex) = zipped.repetition_index {
4263 vec![zipped.values, repindex]
4264 } else {
4265 vec![zipped.values]
4266 };
4267
4268 Ok(EncodedPage {
4269 num_rows: num_lists,
4270 column_idx,
4271 data,
4272 description: PageEncoding::Structural(description),
4273 row_number,
4274 })
4275 }
4276
4277 fn dictionary_encode(mut data_block: DataBlock, cardinality: u64) -> (DataBlock, DataBlock) {
4278 match data_block {
4279 DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
4280 let mut map = HashMap::new();
4283 let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
4284 let u128_slice = u128_slice.as_ref();
4285 let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
4286 let mut indices_buffer =
4287 Vec::with_capacity(fixed_width_data_block.num_values as usize);
4288 let mut curr_idx: u8 = 0;
4289 u128_slice.iter().for_each(|&value| {
4290 let idx = *map.entry(value).or_insert_with(|| {
4291 dictionary_buffer.push(value);
4292 curr_idx += 1;
4293 curr_idx - 1
4294 });
4295 indices_buffer.push(idx);
4296 });
4297 let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4298 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4299 bits_per_value: 128,
4300 num_values: curr_idx as u64,
4301 block_info: BlockInfo::default(),
4302 });
4303 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4304 data: LanceBuffer::reinterpret_vec(indices_buffer),
4305 bits_per_value: 8,
4306 num_values: fixed_width_data_block.num_values,
4307 block_info: BlockInfo::default(),
4308 });
4309 indices_data_block.compute_stat();
4312
4313 (indices_data_block, dictionary_data_block)
4314 }
4315 DataBlock::VariableWidth(ref mut variable_width_data_block) => {
4316 match variable_width_data_block.bits_per_offset {
4317 32 => {
4318 let mut map: HashMap<U8SliceKey, u8> = HashMap::new();
4319 let offsets = variable_width_data_block
4320 .offsets
4321 .borrow_to_typed_slice::<u32>();
4322 let offsets = offsets.as_ref();
4323
4324 let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
4325 "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
4326 );
4327 let max_len = max_len.as_primitive::<UInt64Type>().value(0);
4328
4329 let mut dictionary_buffer: Vec<u8> =
4330 Vec::with_capacity((max_len * cardinality) as usize);
4331 let mut dictionary_offsets_buffer = vec![0];
4332 let mut curr_idx = 0;
4333 let mut indices_buffer =
4334 Vec::with_capacity(variable_width_data_block.num_values as usize);
4335
4336 offsets
4337 .iter()
4338 .zip(offsets.iter().skip(1))
4339 .for_each(|(&start, &end)| {
4340 let key =
4341 &variable_width_data_block.data[start as usize..end as usize];
4342 let idx = *map.entry(U8SliceKey(key)).or_insert_with(|| {
4343 dictionary_buffer.extend_from_slice(key);
4344 dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
4345 curr_idx += 1;
4346 curr_idx - 1
4347 });
4348 indices_buffer.push(idx);
4349 });
4350
4351 let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
4352 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4353 offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
4354 bits_per_offset: 32,
4355 num_values: curr_idx as u64,
4356 block_info: BlockInfo::default(),
4357 });
4358
4359 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4360 data: LanceBuffer::Owned(indices_buffer),
4361 bits_per_value: 8,
4362 num_values: variable_width_data_block.num_values,
4363 block_info: BlockInfo::default(),
4364 });
4365 indices_data_block.compute_stat();
4368
4369 (indices_data_block, dictionary_data_block)
4370 }
4371 64 => {
4372 todo!("A follow up PR to support dictionary encoding with dictionary type `VariableWidth DataBlock` with bits_per_offset 64");
4373 }
4374 _ => {
4375 unreachable!()
4376 }
4377 }
4378 }
4379 _ => {
4380 unreachable!("dictionary encode called with data block {:?}", data_block)
4381 }
4382 }
4383 }
4384
4385 fn do_flush(
4387 &mut self,
4388 arrays: Vec<ArrayRef>,
4389 repdefs: Vec<RepDefBuilder>,
4390 row_number: u64,
4391 num_rows: u64,
4392 ) -> Result<Vec<EncodeTask>> {
4393 let column_idx = self.column_index;
4394 let compression_strategy = self.compression_strategy.clone();
4395 let field = self.field.clone();
4396 let encoding_metadata = self.encoding_metadata.clone();
4397 let task = spawn_cpu(move || {
4398 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4399 if num_values == 0 {
4400 return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4404 }
4405 let num_nulls = arrays
4406 .iter()
4407 .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4408 .sum::<u64>();
4409
4410 if num_values == num_nulls {
4411 if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4412 log::debug!(
4413 "Encoding column {} with {} items using simple-null layout",
4414 column_idx,
4415 num_values
4416 );
4417 Self::encode_simple_all_null(column_idx, num_values, row_number)
4419 } else {
4420 Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4423 }
4424 } else {
4425 let data_block = DataBlock::from_arrays(&arrays, num_values);
4426
4427 if let DataBlock::Struct(ref struct_data_block) = data_block {
4429 if struct_data_block
4430 .children
4431 .iter()
4432 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4433 {
4434 panic!("packed struct encoding currently only supports fixed-width fields.")
4435 }
4436 }
4437
4438 const DICTIONARY_ENCODING_THRESHOLD: u64 = 100;
4439 let cardinality =
4440 if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4441 cardinality_array.as_primitive::<UInt64Type>().value(0)
4442 } else {
4443 u64::MAX
4444 };
4445
4446 if cardinality <= DICTIONARY_ENCODING_THRESHOLD
4448 && data_block.num_values() >= 10 * cardinality
4449 {
4450 let (indices_data_block, dictionary_data_block) =
4451 Self::dictionary_encode(data_block, cardinality);
4452 Self::encode_miniblock(
4453 column_idx,
4454 &field,
4455 compression_strategy.as_ref(),
4456 indices_data_block,
4457 repdefs,
4458 row_number,
4459 Some(dictionary_data_block),
4460 num_rows,
4461 )
4462 } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4463 log::debug!(
4464 "Encoding column {} with {} items using mini-block layout",
4465 column_idx,
4466 num_values
4467 );
4468 Self::encode_miniblock(
4469 column_idx,
4470 &field,
4471 compression_strategy.as_ref(),
4472 data_block,
4473 repdefs,
4474 row_number,
4475 None,
4476 num_rows,
4477 )
4478 } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4479 log::debug!(
4480 "Encoding column {} with {} items using full-zip layout",
4481 column_idx,
4482 num_values
4483 );
4484 Self::encode_full_zip(
4485 column_idx,
4486 &field,
4487 compression_strategy.as_ref(),
4488 data_block,
4489 repdefs,
4490 row_number,
4491 num_rows,
4492 )
4493 } else {
4494 Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4495 }
4496 }
4497 })
4498 .boxed();
4499 Ok(vec![task])
4500 }
4501
4502 fn extract_validity_buf(array: &dyn Array, repdef: &mut RepDefBuilder) {
4503 if let Some(validity) = array.nulls() {
4504 repdef.add_validity_bitmap(validity.clone());
4505 } else {
4506 repdef.add_no_null(array.len());
4507 }
4508 }
4509
4510 fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder) {
4511 match array.data_type() {
4512 DataType::Null => {
4513 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4514 }
4515 DataType::Dictionary(_, _) => {
4516 unreachable!()
4517 }
4518 DataType::FixedSizeList(_, dimension) => {
4519 repdef.add_fsl(array.nulls().cloned(), *dimension as usize, array.len());
4521 let array = array.as_fixed_size_list();
4522 Self::extract_validity(array.values(), repdef);
4523 }
4524 _ => Self::extract_validity_buf(array, repdef),
4525 }
4526 }
4527}
4528
4529impl FieldEncoder for PrimitiveStructuralEncoder {
4530 fn maybe_encode(
4532 &mut self,
4533 array: ArrayRef,
4534 _external_buffers: &mut OutOfLineBuffers,
4535 mut repdef: RepDefBuilder,
4536 row_number: u64,
4537 num_rows: u64,
4538 ) -> Result<Vec<EncodeTask>> {
4539 Self::extract_validity(array.as_ref(), &mut repdef);
4540 self.accumulated_repdefs.push(repdef);
4541
4542 if let Some((arrays, row_number, num_rows)) =
4543 self.accumulation_queue.insert(array, row_number, num_rows)
4544 {
4545 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4546 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4547 } else {
4548 Ok(vec![])
4549 }
4550 }
4551
4552 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4554 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4555 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4556 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4557 } else {
4558 Ok(vec![])
4559 }
4560 }
4561
4562 fn num_columns(&self) -> u32 {
4563 1
4564 }
4565
4566 fn finish(
4567 &mut self,
4568 _external_buffers: &mut OutOfLineBuffers,
4569 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4570 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4571 }
4572}
4573
4574#[cfg(test)]
4575#[allow(clippy::single_range_in_vec_init)]
4576mod tests {
4577 use std::{collections::VecDeque, sync::Arc};
4578
4579 use arrow_array::{ArrayRef, Int8Array, StringArray};
4580
4581 use crate::encodings::logical::primitive::{
4582 ChunkDrainInstructions, PrimitiveStructuralEncoder,
4583 };
4584
4585 use super::{
4586 ChunkInstructions, DataBlock, DecodeMiniBlockTask, PreambleAction, RepetitionIndex,
4587 };
4588
4589 #[test]
4590 fn test_is_narrow() {
4591 let int8_array = Int8Array::from(vec![1, 2, 3]);
4592 let array_ref: ArrayRef = Arc::new(int8_array);
4593 let block = DataBlock::from_array(array_ref);
4594
4595 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4596
4597 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4598 let block = DataBlock::from_array(string_array);
4599 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4600
4601 let string_array = StringArray::from(vec![
4602 Some("hello world".repeat(100)),
4603 Some("world".to_string()),
4604 ]);
4605 let block = DataBlock::from_array(string_array);
4606 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4607 }
4608
4609 #[test]
4610 fn test_map_range() {
4611 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4614 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4615 let max_visible_def = 0;
4616 let total_items = 8;
4617 let max_rep = 1;
4618
4619 let check = |range, expected_item_range, expected_level_range| {
4620 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4621 range,
4622 rep.as_ref(),
4623 def.as_ref(),
4624 max_rep,
4625 max_visible_def,
4626 total_items,
4627 PreambleAction::Absent,
4628 );
4629 assert_eq!(item_range, expected_item_range);
4630 assert_eq!(level_range, expected_level_range);
4631 };
4632
4633 check(0..1, 0..3, 0..3);
4634 check(1..2, 3..5, 3..5);
4635 check(2..3, 5..5, 5..6);
4636 check(3..4, 5..8, 6..9);
4637 check(0..2, 0..5, 0..5);
4638 check(1..3, 3..5, 3..6);
4639 check(2..4, 5..8, 5..9);
4640 check(0..3, 0..5, 0..6);
4641 check(1..4, 3..8, 3..9);
4642 check(0..4, 0..8, 0..9);
4643
4644 let rep = Some(vec![1, 1, 0, 1]);
4647 let def = Some(vec![1, 0, 0, 0]);
4648 let max_visible_def = 0;
4649 let total_items = 3;
4650
4651 let check = |range, expected_item_range, expected_level_range| {
4652 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4653 range,
4654 rep.as_ref(),
4655 def.as_ref(),
4656 max_rep,
4657 max_visible_def,
4658 total_items,
4659 PreambleAction::Absent,
4660 );
4661 assert_eq!(item_range, expected_item_range);
4662 assert_eq!(level_range, expected_level_range);
4663 };
4664
4665 check(0..1, 0..0, 0..1);
4666 check(1..2, 0..2, 1..3);
4667 check(2..3, 2..3, 3..4);
4668 check(0..2, 0..2, 0..3);
4669 check(1..3, 0..3, 1..4);
4670 check(0..3, 0..3, 0..4);
4671
4672 let rep = Some(vec![1, 1, 0, 1]);
4675 let def = Some(vec![0, 0, 0, 1]);
4676 let max_visible_def = 0;
4677 let total_items = 3;
4678
4679 let check = |range, expected_item_range, expected_level_range| {
4680 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4681 range,
4682 rep.as_ref(),
4683 def.as_ref(),
4684 max_rep,
4685 max_visible_def,
4686 total_items,
4687 PreambleAction::Absent,
4688 );
4689 assert_eq!(item_range, expected_item_range);
4690 assert_eq!(level_range, expected_level_range);
4691 };
4692
4693 check(0..1, 0..1, 0..1);
4694 check(1..2, 1..3, 1..3);
4695 check(2..3, 3..3, 3..4);
4696 check(0..2, 0..3, 0..3);
4697 check(1..3, 1..3, 1..4);
4698 check(0..3, 0..3, 0..4);
4699
4700 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4703 let def: Option<&[u16]> = None;
4704 let max_visible_def = 0;
4705 let total_items = 6;
4706
4707 let check = |range, expected_item_range, expected_level_range| {
4708 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4709 range,
4710 rep.as_ref(),
4711 def.as_ref(),
4712 max_rep,
4713 max_visible_def,
4714 total_items,
4715 PreambleAction::Absent,
4716 );
4717 assert_eq!(item_range, expected_item_range);
4718 assert_eq!(level_range, expected_level_range);
4719 };
4720
4721 check(0..1, 0..2, 0..2);
4722 check(1..2, 2..4, 2..4);
4723 check(2..3, 4..6, 4..6);
4724 check(0..2, 0..4, 0..4);
4725 check(1..3, 2..6, 2..6);
4726 check(0..3, 0..6, 0..6);
4727
4728 let rep: Option<&[u16]> = None;
4731 let def = Some(vec![0, 0, 1, 0]);
4732 let max_visible_def = 1;
4733 let total_items = 4;
4734
4735 let check = |range, expected_item_range, expected_level_range| {
4736 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4737 range,
4738 rep.as_ref(),
4739 def.as_ref(),
4740 max_rep,
4741 max_visible_def,
4742 total_items,
4743 PreambleAction::Absent,
4744 );
4745 assert_eq!(item_range, expected_item_range);
4746 assert_eq!(level_range, expected_level_range);
4747 };
4748
4749 check(0..1, 0..1, 0..1);
4750 check(1..2, 1..2, 1..2);
4751 check(2..3, 2..3, 2..3);
4752 check(0..2, 0..2, 0..2);
4753 check(1..3, 1..3, 1..3);
4754 check(0..3, 0..3, 0..3);
4755
4756 let rep = Some(vec![0, 1, 0, 1]);
4761 let def = Some(vec![0, 0, 0, 1]);
4762 let max_visible_def = 0;
4763 let total_items = 3;
4764
4765 let check = |range, expected_item_range, expected_level_range| {
4766 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4767 range,
4768 rep.as_ref(),
4769 def.as_ref(),
4770 max_rep,
4771 max_visible_def,
4772 total_items,
4773 PreambleAction::Take,
4774 );
4775 assert_eq!(item_range, expected_item_range);
4776 assert_eq!(level_range, expected_level_range);
4777 };
4778
4779 check(0..1, 0..3, 0..3);
4781 check(0..2, 0..3, 0..4);
4782
4783 let check = |range, expected_item_range, expected_level_range| {
4784 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4785 range,
4786 rep.as_ref(),
4787 def.as_ref(),
4788 max_rep,
4789 max_visible_def,
4790 total_items,
4791 PreambleAction::Skip,
4792 );
4793 assert_eq!(item_range, expected_item_range);
4794 assert_eq!(level_range, expected_level_range);
4795 };
4796
4797 check(0..1, 1..3, 1..3);
4798 check(1..2, 3..3, 3..4);
4799 check(0..2, 1..3, 1..4);
4800
4801 let rep = Some(vec![0, 1, 1, 0]);
4806 let def = Some(vec![0, 1, 0, 0]);
4807 let max_visible_def = 0;
4808 let total_items = 4;
4809
4810 let check = |range, expected_item_range, expected_level_range| {
4811 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4812 range,
4813 rep.as_ref(),
4814 def.as_ref(),
4815 max_rep,
4816 max_visible_def,
4817 total_items,
4818 PreambleAction::Take,
4819 );
4820 assert_eq!(item_range, expected_item_range);
4821 assert_eq!(level_range, expected_level_range);
4822 };
4823
4824 check(0..1, 0..1, 0..2);
4826 check(0..2, 0..3, 0..4);
4827
4828 let check = |range, expected_item_range, expected_level_range| {
4829 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4830 range,
4831 rep.as_ref(),
4832 def.as_ref(),
4833 max_rep,
4834 max_visible_def,
4835 total_items,
4836 PreambleAction::Skip,
4837 );
4838 assert_eq!(item_range, expected_item_range);
4839 assert_eq!(level_range, expected_level_range);
4840 };
4841
4842 check(0..1, 1..1, 1..2);
4844 check(1..2, 1..3, 2..4);
4845 check(0..2, 1..3, 1..4);
4846
4847 let rep = Some(vec![0, 1, 0, 1]);
4850 let def: Option<Vec<u16>> = None;
4851 let max_visible_def = 0;
4852 let total_items = 4;
4853
4854 let check = |range, expected_item_range, expected_level_range| {
4855 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4856 range,
4857 rep.as_ref(),
4858 def.as_ref(),
4859 max_rep,
4860 max_visible_def,
4861 total_items,
4862 PreambleAction::Take,
4863 );
4864 assert_eq!(item_range, expected_item_range);
4865 assert_eq!(level_range, expected_level_range);
4866 };
4867
4868 check(0..1, 0..3, 0..3);
4870 check(0..2, 0..4, 0..4);
4871
4872 let check = |range, expected_item_range, expected_level_range| {
4873 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4874 range,
4875 rep.as_ref(),
4876 def.as_ref(),
4877 max_rep,
4878 max_visible_def,
4879 total_items,
4880 PreambleAction::Skip,
4881 );
4882 assert_eq!(item_range, expected_item_range);
4883 assert_eq!(level_range, expected_level_range);
4884 };
4885
4886 check(0..1, 1..3, 1..3);
4887 check(1..2, 3..4, 3..4);
4888 check(0..2, 1..4, 1..4);
4889 }
4890
4891 #[test]
4892 fn test_schedule_instructions() {
4893 let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4894 let repetition_index = RepetitionIndex::decode(&repetition_index);
4895
4896 let check = |user_ranges, expected_instructions| {
4897 let instructions =
4898 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4899 assert_eq!(instructions, expected_instructions);
4900 };
4901
4902 let expected_take_all = vec![
4904 ChunkInstructions {
4905 chunk_idx: 0,
4906 preamble: PreambleAction::Absent,
4907 rows_to_skip: 0,
4908 rows_to_take: 5,
4909 take_trailer: true,
4910 },
4911 ChunkInstructions {
4912 chunk_idx: 1,
4913 preamble: PreambleAction::Take,
4914 rows_to_skip: 0,
4915 rows_to_take: 2,
4916 take_trailer: false,
4917 },
4918 ChunkInstructions {
4919 chunk_idx: 2,
4920 preamble: PreambleAction::Absent,
4921 rows_to_skip: 0,
4922 rows_to_take: 4,
4923 take_trailer: true,
4924 },
4925 ChunkInstructions {
4926 chunk_idx: 3,
4927 preamble: PreambleAction::Take,
4928 rows_to_skip: 0,
4929 rows_to_take: 1,
4930 take_trailer: false,
4931 },
4932 ];
4933
4934 check(&[0..14], expected_take_all.clone());
4936
4937 check(
4939 &[
4940 0..1,
4941 1..2,
4942 2..3,
4943 3..4,
4944 4..5,
4945 5..6,
4946 6..7,
4947 7..8,
4948 8..9,
4949 9..10,
4950 10..11,
4951 11..12,
4952 12..13,
4953 13..14,
4954 ],
4955 expected_take_all,
4956 );
4957
4958 check(
4962 &[0..1, 3..4],
4963 vec![
4964 ChunkInstructions {
4965 chunk_idx: 0,
4966 preamble: PreambleAction::Absent,
4967 rows_to_skip: 0,
4968 rows_to_take: 1,
4969 take_trailer: false,
4970 },
4971 ChunkInstructions {
4972 chunk_idx: 0,
4973 preamble: PreambleAction::Absent,
4974 rows_to_skip: 3,
4975 rows_to_take: 1,
4976 take_trailer: false,
4977 },
4978 ],
4979 );
4980
4981 check(
4983 &[5..6],
4984 vec![
4985 ChunkInstructions {
4986 chunk_idx: 0,
4987 preamble: PreambleAction::Absent,
4988 rows_to_skip: 5,
4989 rows_to_take: 0,
4990 take_trailer: true,
4991 },
4992 ChunkInstructions {
4993 chunk_idx: 1,
4994 preamble: PreambleAction::Take,
4995 rows_to_skip: 0,
4996 rows_to_take: 0,
4997 take_trailer: false,
4998 },
4999 ],
5000 );
5001
5002 check(
5004 &[7..10],
5005 vec![
5006 ChunkInstructions {
5007 chunk_idx: 1,
5008 preamble: PreambleAction::Skip,
5009 rows_to_skip: 1,
5010 rows_to_take: 1,
5011 take_trailer: false,
5012 },
5013 ChunkInstructions {
5014 chunk_idx: 2,
5015 preamble: PreambleAction::Absent,
5016 rows_to_skip: 0,
5017 rows_to_take: 2,
5018 take_trailer: false,
5019 },
5020 ],
5021 );
5022 }
5023
5024 #[test]
5025 fn test_drain_instructions() {
5026 fn drain_from_instructions(
5027 instructions: &mut VecDeque<ChunkInstructions>,
5028 mut rows_desired: u64,
5029 need_preamble: &mut bool,
5030 skip_in_chunk: &mut u64,
5031 ) -> Vec<ChunkDrainInstructions> {
5032 let mut drain_instructions = Vec::with_capacity(instructions.len());
5034 while rows_desired > 0 || *need_preamble {
5035 let (next_instructions, consumed_chunk) = instructions
5036 .front()
5037 .unwrap()
5038 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5039 if consumed_chunk {
5040 instructions.pop_front();
5041 }
5042 drain_instructions.push(next_instructions);
5043 }
5044 drain_instructions
5045 }
5046
5047 let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
5048 let repetition_index = RepetitionIndex::decode(&repetition_index);
5049 let user_ranges = vec![1..7, 10..14];
5050
5051 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5053
5054 let mut to_drain = VecDeque::from(scheduled.clone());
5055
5056 let mut need_preamble = false;
5059 let mut skip_in_chunk = 0;
5060
5061 let next_batch =
5062 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5063
5064 assert!(!need_preamble);
5065 assert_eq!(skip_in_chunk, 4);
5066 assert_eq!(
5067 next_batch,
5068 vec![ChunkDrainInstructions {
5069 chunk_instructions: scheduled[0].clone(),
5070 rows_to_take: 4,
5071 rows_to_skip: 0,
5072 preamble_action: PreambleAction::Absent,
5073 }]
5074 );
5075
5076 let next_batch =
5077 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5078
5079 assert!(!need_preamble);
5080 assert_eq!(skip_in_chunk, 2);
5081
5082 assert_eq!(
5083 next_batch,
5084 vec![
5085 ChunkDrainInstructions {
5086 chunk_instructions: scheduled[0].clone(),
5087 rows_to_take: 1,
5088 rows_to_skip: 4,
5089 preamble_action: PreambleAction::Absent,
5090 },
5091 ChunkDrainInstructions {
5092 chunk_instructions: scheduled[1].clone(),
5093 rows_to_take: 1,
5094 rows_to_skip: 0,
5095 preamble_action: PreambleAction::Take,
5096 },
5097 ChunkDrainInstructions {
5098 chunk_instructions: scheduled[2].clone(),
5099 rows_to_take: 2,
5100 rows_to_skip: 0,
5101 preamble_action: PreambleAction::Absent,
5102 }
5103 ]
5104 );
5105
5106 let next_batch =
5107 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5108
5109 assert!(!need_preamble);
5110 assert_eq!(skip_in_chunk, 0);
5111
5112 assert_eq!(
5113 next_batch,
5114 vec![
5115 ChunkDrainInstructions {
5116 chunk_instructions: scheduled[2].clone(),
5117 rows_to_take: 1,
5118 rows_to_skip: 2,
5119 preamble_action: PreambleAction::Absent,
5120 },
5121 ChunkDrainInstructions {
5122 chunk_instructions: scheduled[3].clone(),
5123 rows_to_take: 1,
5124 rows_to_skip: 0,
5125 preamble_action: PreambleAction::Take,
5126 },
5127 ]
5128 );
5129
5130 let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
5132 let repetition_index = RepetitionIndex::decode(&repetition_index);
5133 let user_ranges = vec![0..28];
5134
5135 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5137
5138 let mut to_drain = VecDeque::from(scheduled.clone());
5139
5140 let mut need_preamble = false;
5143 let mut skip_in_chunk = 0;
5144
5145 let next_batch =
5146 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5147
5148 assert_eq!(
5149 next_batch,
5150 vec![
5151 ChunkDrainInstructions {
5152 chunk_instructions: scheduled[0].clone(),
5153 rows_to_take: 6,
5154 rows_to_skip: 0,
5155 preamble_action: PreambleAction::Absent,
5156 },
5157 ChunkDrainInstructions {
5158 chunk_instructions: scheduled[1].clone(),
5159 rows_to_take: 1,
5160 rows_to_skip: 0,
5161 preamble_action: PreambleAction::Take,
5162 },
5163 ]
5164 );
5165
5166 assert!(!need_preamble);
5167 assert_eq!(skip_in_chunk, 1);
5168
5169 let next_batch =
5172 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5173
5174 assert_eq!(
5175 next_batch,
5176 vec![
5177 ChunkDrainInstructions {
5178 chunk_instructions: scheduled[1].clone(),
5179 rows_to_take: 2,
5180 rows_to_skip: 1,
5181 preamble_action: PreambleAction::Skip,
5182 },
5183 ChunkDrainInstructions {
5184 chunk_instructions: scheduled[2].clone(),
5185 rows_to_take: 0,
5186 rows_to_skip: 0,
5187 preamble_action: PreambleAction::Take,
5188 },
5189 ]
5190 );
5191
5192 assert!(!need_preamble);
5193 assert_eq!(skip_in_chunk, 0);
5194 }
5195}