1use std::{collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{
7 cast::AsArray,
8 new_empty_array,
9 types::{Int32Type, Int64Type, UInt64Type},
10 Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
11};
12use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
13use arrow_schema::{DataType, Field, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_arrow::list::ListArrayExt;
16use log::trace;
17use snafu::location;
18use tokio::task::JoinHandle;
19
20use lance_core::{cache::FileMetadataCache, Error, Result};
21
22use crate::{
23 buffer::LanceBuffer,
24 data::{BlockInfo, DataBlock, FixedWidthDataBlock},
25 decoder::{
26 DecodeArrayTask, DecodeBatchScheduler, DecodedArray, FieldScheduler, FilterExpression,
27 ListPriorityRange, LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding,
28 PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
29 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
30 StructuralSchedulingJob,
31 },
32 encoder::{
33 ArrayEncoder, EncodeTask, EncodedArray, EncodedColumn, EncodedPage, FieldEncoder,
34 OutOfLineBuffers,
35 },
36 encodings::logical::r#struct::SimpleStructScheduler,
37 format::pb,
38 repdef::RepDefBuilder,
39 EncodingsIo,
40};
41
42use super::{primitive::AccumulationQueue, r#struct::SimpleStructDecoder};
43
44#[derive(Debug)]
69struct ListRequest {
70 num_lists: u64,
72 includes_extra_offset: bool,
74 null_offset_adjustment: u64,
76 items_offset: u64,
78}
79
80#[derive(Debug)]
81struct ListRequestsIter {
82 list_requests: VecDeque<ListRequest>,
84 offsets_requests: Vec<Range<u64>>,
85}
86
87impl ListRequestsIter {
88 fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
91 let mut items_offset = 0;
92 let mut offsets_offset = 0;
93 let mut page_infos_iter = page_infos.iter();
94 let mut cur_page_info = page_infos_iter.next().unwrap();
95 let mut list_requests = VecDeque::new();
96 let mut offsets_requests = Vec::new();
97
98 for range in row_ranges {
101 let mut range = range.clone();
102
103 while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
105 trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
106 offsets_offset += cur_page_info.offsets_in_page;
107 items_offset += cur_page_info.num_items_referenced_by_page;
108 cur_page_info = page_infos_iter.next().unwrap();
109 }
110
111 let mut includes_extra_offset = range.start != offsets_offset;
114 if includes_extra_offset {
115 offsets_requests.push(range.start - 1..range.end);
116 } else {
117 offsets_requests.push(range.clone());
118 }
119
120 while !range.is_empty() {
123 let end = offsets_offset + cur_page_info.offsets_in_page;
126 let last = end >= range.end;
127 let end = end.min(range.end);
128 list_requests.push_back(ListRequest {
129 num_lists: end - range.start,
130 includes_extra_offset,
131 null_offset_adjustment: cur_page_info.null_offset_adjustment,
132 items_offset,
133 });
134
135 includes_extra_offset = false;
136 range.start = end;
137 if !last {
140 offsets_offset += cur_page_info.offsets_in_page;
141 items_offset += cur_page_info.num_items_referenced_by_page;
142 cur_page_info = page_infos_iter.next().unwrap();
143 }
144 }
145 }
146 Self {
147 list_requests,
148 offsets_requests,
149 }
150 }
151
152 fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
154 let mut list_requests = Vec::new();
155 while num_offsets > 0 {
156 let req = self.list_requests.front_mut().unwrap();
157 if req.includes_extra_offset {
159 num_offsets -= 1;
160 debug_assert_ne!(num_offsets, 0);
161 }
162 if num_offsets >= req.num_lists {
163 num_offsets -= req.num_lists;
164 list_requests.push(self.list_requests.pop_front().unwrap());
165 } else {
166 let sub_req = ListRequest {
167 num_lists: num_offsets,
168 includes_extra_offset: req.includes_extra_offset,
169 null_offset_adjustment: req.null_offset_adjustment,
170 items_offset: req.items_offset,
171 };
172
173 list_requests.push(sub_req);
174 req.includes_extra_offset = false;
175 req.num_lists -= num_offsets;
176 num_offsets = 0;
177 }
178 }
179 list_requests
180 }
181}
182
183fn decode_offsets(
203 offsets: &dyn Array,
204 list_requests: &[ListRequest],
205 null_offset_adjustment: u64,
206) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
207 let numeric_offsets = offsets.as_primitive::<UInt64Type>();
209 let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
211 let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
212 let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
213 normalized_offsets.push(0);
215 let mut last_normalized_offset = 0;
216 let offsets_values = numeric_offsets.values();
217
218 let mut item_ranges = VecDeque::new();
219 let mut offsets_offset: u32 = 0;
220 debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
222 for req in list_requests {
223 let num_lists = req.num_lists;
225
226 let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
232 if !req.includes_extra_offset {
233 let first_offset_idx = 0_usize;
235 let num_offsets = num_lists as usize;
236 let items_start = 0;
237 let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
238 let items_range = items_start..items_end;
239 (items_range, first_offset_idx, num_offsets)
240 } else {
241 let first_offset_idx = offsets_offset as usize;
244 let num_offsets = num_lists as usize + 1;
245 let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
246 let items_end =
247 offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
248 let items_range = items_start..items_end;
249 (items_range, first_offset_idx, num_offsets)
250 };
251
252 let validity_start = if !req.includes_extra_offset {
265 0
266 } else {
267 offsets_to_norm_start + 1
268 };
269 for off in offsets_values
270 .slice(validity_start, num_lists as usize)
271 .iter()
272 {
273 validity_buffer.append(*off < null_offset_adjustment);
274 }
275
276 if !req.includes_extra_offset {
278 let first_item = offsets_values[0] % null_offset_adjustment;
279 normalized_offsets.push(first_item);
280 last_normalized_offset = first_item;
281 }
282
283 normalized_offsets.extend(
287 offsets_values
288 .slice(offsets_to_norm_start, num_offsets_to_norm)
289 .windows(2)
290 .map(|w| {
291 let start = w[0] % null_offset_adjustment;
292 let end = w[1] % null_offset_adjustment;
293 if end < start {
294 panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
295 }
296 let length = end - start;
297 last_normalized_offset += length;
298 last_normalized_offset
299 }),
300 );
301 trace!(
302 "List offsets range of {} lists maps to item range {:?}",
303 num_lists,
304 items_range
305 );
306 offsets_offset += num_offsets_to_norm as u32;
307 if !items_range.is_empty() {
308 let items_range =
309 items_range.start + req.items_offset..items_range.end + req.items_offset;
310 item_ranges.push_back(items_range);
311 }
312 }
313
314 let validity = validity_buffer.finish();
315 (item_ranges, normalized_offsets, validity)
316}
317
318#[allow(clippy::too_many_arguments)]
325async fn indirect_schedule_task(
326 mut offsets_decoder: Box<dyn LogicalPageDecoder>,
327 list_requests: Vec<ListRequest>,
328 null_offset_adjustment: u64,
329 items_scheduler: Arc<dyn FieldScheduler>,
330 items_type: DataType,
331 io: Arc<dyn EncodingsIo>,
332 cache: Arc<FileMetadataCache>,
333 priority: Box<dyn PriorityRange>,
334) -> Result<IndirectlyLoaded> {
335 let num_offsets = offsets_decoder.num_rows();
336 offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
339 let decode_task = offsets_decoder.drain(num_offsets)?;
340 let offsets = decode_task.task.decode()?;
341
342 let (item_ranges, offsets, validity) =
343 decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
344
345 trace!(
346 "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
347 item_ranges,
348 items_scheduler.num_rows(),
349 priority
350 );
351 let offsets: Arc<[u64]> = offsets.into();
352
353 if item_ranges.is_empty() {
355 debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
356 return Ok(IndirectlyLoaded {
357 root_decoder: None,
358 offsets,
359 validity,
360 });
361 }
362 let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
363 let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
364
365 let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
367 let indirect_root_scheduler =
368 SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
369 let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
370 Arc::new(indirect_root_scheduler),
371 root_fields.clone(),
372 cache,
373 );
374 let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
375
376 let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
377
378 let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
379 &item_ranges,
380 &FilterExpression::no_filter(),
382 io,
383 Some(priority),
384 )?;
385
386 for message in indirect_messages {
387 for decoder in message.decoders {
388 let decoder = decoder.into_legacy();
389 if !decoder.path.is_empty() {
390 root_decoder.accept_child(decoder)?;
391 }
392 }
393 }
394
395 Ok(IndirectlyLoaded {
396 offsets,
397 validity,
398 root_decoder: Some(root_decoder),
399 })
400}
401
402#[derive(Debug)]
403struct ListFieldSchedulingJob<'a> {
404 scheduler: &'a ListFieldScheduler,
405 offsets: Box<dyn SchedulingJob + 'a>,
406 num_rows: u64,
407 list_requests_iter: ListRequestsIter,
408}
409
410impl<'a> ListFieldSchedulingJob<'a> {
411 fn try_new(
412 scheduler: &'a ListFieldScheduler,
413 ranges: &[Range<u64>],
414 filter: &FilterExpression,
415 ) -> Result<Self> {
416 let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
417 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
418 let offsets = scheduler
419 .offsets_scheduler
420 .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
421 Ok(Self {
422 scheduler,
423 offsets,
424 list_requests_iter,
425 num_rows,
426 })
427 }
428}
429
430impl SchedulingJob for ListFieldSchedulingJob<'_> {
431 fn schedule_next(
432 &mut self,
433 context: &mut SchedulerContext,
434 priority: &dyn PriorityRange,
435 ) -> Result<ScheduledScanLine> {
436 let next_offsets = self.offsets.schedule_next(context, priority)?;
437 let offsets_scheduled = next_offsets.rows_scheduled;
438 let list_reqs = self.list_requests_iter.next(offsets_scheduled);
439 trace!(
440 "Scheduled {} offsets which maps to list requests: {:?}",
441 offsets_scheduled,
442 list_reqs
443 );
444 let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
445 debug_assert!(list_reqs
448 .iter()
449 .all(|req| req.null_offset_adjustment == null_offset_adjustment));
450 let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
451 let next_offsets_decoder = next_offsets
453 .decoders
454 .into_iter()
455 .next()
456 .unwrap()
457 .into_legacy()
458 .decoder;
459
460 let items_scheduler = self.scheduler.items_scheduler.clone();
461 let items_type = self.scheduler.items_field.data_type().clone();
462 let io = context.io().clone();
463 let cache = context.cache().clone();
464
465 let indirect_fut = tokio::spawn(indirect_schedule_task(
467 next_offsets_decoder,
468 list_reqs,
469 null_offset_adjustment,
470 items_scheduler,
471 items_type,
472 io,
473 cache,
474 priority.box_clone(),
475 ));
476
477 let decoder = Box::new(ListPageDecoder {
479 offsets: Arc::new([]),
480 validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
481 item_decoder: None,
482 rows_drained: 0,
483 rows_loaded: 0,
484 items_field: self.scheduler.items_field.clone(),
485 num_rows,
486 unloaded: Some(indirect_fut),
487 offset_type: self.scheduler.offset_type.clone(),
488 data_type: self.scheduler.list_type.clone(),
489 });
490 let decoder = context.locate_decoder(decoder);
491 Ok(ScheduledScanLine {
492 decoders: vec![MessageType::DecoderReady(decoder)],
493 rows_scheduled: num_rows,
494 })
495 }
496
497 fn num_rows(&self) -> u64 {
498 self.num_rows
499 }
500}
501
502#[derive(Debug)]
517pub struct ListFieldScheduler {
518 offsets_scheduler: Arc<dyn FieldScheduler>,
519 items_scheduler: Arc<dyn FieldScheduler>,
520 items_field: Arc<Field>,
521 offset_type: DataType,
522 list_type: DataType,
523 offset_page_info: Vec<OffsetPageInfo>,
524}
525
526#[derive(Debug)]
530pub struct OffsetPageInfo {
531 pub offsets_in_page: u64,
532 pub null_offset_adjustment: u64,
533 pub num_items_referenced_by_page: u64,
534}
535
536impl ListFieldScheduler {
537 pub fn new(
539 offsets_scheduler: Arc<dyn FieldScheduler>,
540 items_scheduler: Arc<dyn FieldScheduler>,
541 items_field: Arc<Field>,
542 offset_type: DataType,
544 offset_page_info: Vec<OffsetPageInfo>,
545 ) -> Self {
546 let list_type = match &offset_type {
547 DataType::Int32 => DataType::List(items_field.clone()),
548 DataType::Int64 => DataType::LargeList(items_field.clone()),
549 _ => panic!("Unexpected offset type {}", offset_type),
550 };
551 Self {
552 offsets_scheduler,
553 items_scheduler,
554 items_field,
555 offset_type,
556 offset_page_info,
557 list_type,
558 }
559 }
560}
561
562impl FieldScheduler for ListFieldScheduler {
563 fn schedule_ranges<'a>(
564 &'a self,
565 ranges: &[Range<u64>],
566 filter: &FilterExpression,
567 ) -> Result<Box<dyn SchedulingJob + 'a>> {
568 Ok(Box::new(ListFieldSchedulingJob::try_new(
569 self, ranges, filter,
570 )?))
571 }
572
573 fn num_rows(&self) -> u64 {
574 self.offsets_scheduler.num_rows()
575 }
576
577 fn initialize<'a>(
578 &'a self,
579 _filter: &'a FilterExpression,
580 _context: &'a SchedulerContext,
581 ) -> BoxFuture<'a, Result<()>> {
582 std::future::ready(Ok(())).boxed()
584 }
585}
586
587#[derive(Debug)]
598struct ListPageDecoder {
599 unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
600 offsets: Arc<[u64]>,
602 validity: BooleanBuffer,
603 item_decoder: Option<SimpleStructDecoder>,
604 num_rows: u64,
605 rows_drained: u64,
606 rows_loaded: u64,
607 items_field: Arc<Field>,
608 offset_type: DataType,
609 data_type: DataType,
610}
611
612struct ListDecodeTask {
613 offsets: Vec<u64>,
614 validity: BooleanBuffer,
615 items: Option<Box<dyn DecodeArrayTask>>,
617 items_field: Arc<Field>,
618 offset_type: DataType,
619}
620
621impl DecodeArrayTask for ListDecodeTask {
622 fn decode(self: Box<Self>) -> Result<ArrayRef> {
623 let items = self
624 .items
625 .map(|items| {
626 let wrapped_items = items.decode()?;
629 Result::Ok(wrapped_items.as_struct().column(0).clone())
630 })
631 .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
632
633 let offsets = UInt64Array::from(self.offsets);
639 let validity = NullBuffer::new(self.validity);
640 let validity = if validity.null_count() == 0 {
641 None
642 } else {
643 Some(validity)
644 };
645 let min_offset = UInt64Array::new_scalar(offsets.value(0));
646 let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
647 match &self.offset_type {
648 DataType::Int32 => {
649 let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
650 let offsets_i32 = offsets.as_primitive::<Int32Type>();
651 let offsets = OffsetBuffer::new(offsets_i32.values().clone());
652
653 Ok(Arc::new(ListArray::try_new(
654 self.items_field.clone(),
655 offsets,
656 items,
657 validity,
658 )?))
659 }
660 DataType::Int64 => {
661 let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
662 let offsets_i64 = offsets.as_primitive::<Int64Type>();
663 let offsets = OffsetBuffer::new(offsets_i64.values().clone());
664
665 Ok(Arc::new(LargeListArray::try_new(
666 self.items_field.clone(),
667 offsets,
668 items,
669 validity,
670 )?))
671 }
672 _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
673 }
674 }
675}
676
677fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
682 let mut result = match to_search.binary_search(&target) {
683 Ok(idx) => idx,
684 Err(idx) => idx - 1,
685 };
686 while result < (to_search.len() - 1) && to_search[result + 1] == target {
687 result += 1;
688 }
689 result as u64
690}
691
692impl LogicalPageDecoder for ListPageDecoder {
693 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
694 async move {
695 if self.unloaded.is_some() {
698 trace!("List scheduler needs to wait for indirect I/O to complete");
699 let indirectly_loaded = self.unloaded.take().unwrap().await;
700 if indirectly_loaded.is_err() {
701 match indirectly_loaded.unwrap_err().try_into_panic() {
702 Ok(err) => std::panic::resume_unwind(err),
703 Err(err) => panic!("{:?}", err),
704 };
705 }
706 let indirectly_loaded = indirectly_loaded.unwrap()?;
707
708 self.offsets = indirectly_loaded.offsets;
709 self.validity = indirectly_loaded.validity;
710 self.item_decoder = indirectly_loaded.root_decoder;
711 }
712 if self.rows_loaded > loaded_need {
713 return Ok(());
714 }
715
716 let boundary = loaded_need as usize;
717 debug_assert!(boundary < self.num_rows as usize);
718 let items_needed = self.offsets[boundary + 1].saturating_sub(1);
721 trace!(
722 "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items",
723 loaded_need,
724 self.rows_loaded,
725 self.num_rows,
726 items_needed,
727 );
728
729 let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
730 item_decoder.wait_for_loaded(items_needed).await?;
731 item_decoder.rows_loaded()
732 } else {
733 0
734 };
735
736 self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
737 trace!("List decoder now has {} loaded rows", self.rows_loaded);
738
739 Ok(())
740 }
741 .boxed()
742 }
743
744 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
745 let mut actual_num_rows = num_rows;
747 let item_start = self.offsets[self.rows_drained as usize];
748 if self.offset_type != DataType::Int64 {
749 while actual_num_rows > 0 {
752 let num_items =
753 self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
754 if num_items <= i32::MAX as u64 {
755 break;
756 }
757 actual_num_rows -= 1;
760 }
761 }
762 if actual_num_rows < num_rows {
763 return Err(Error::NotSupported { source: format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into(), location: location!() });
768 }
769 let offsets = self.offsets
770 [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
771 .to_vec();
772 let validity = self
773 .validity
774 .slice(self.rows_drained as usize, actual_num_rows as usize);
775 let start = offsets[0];
776 let end = offsets[offsets.len() - 1];
777 let num_items_to_drain = end - start;
778
779 let item_decode = if num_items_to_drain == 0 {
780 None
781 } else {
782 self.item_decoder
783 .as_mut()
784 .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
785 .transpose()?
786 };
787
788 self.rows_drained += num_rows;
789 Ok(NextDecodeTask {
790 num_rows,
791 task: Box::new(ListDecodeTask {
792 offsets,
793 validity,
794 items_field: self.items_field.clone(),
795 items: item_decode,
796 offset_type: self.offset_type.clone(),
797 }) as Box<dyn DecodeArrayTask>,
798 })
799 }
800
801 fn num_rows(&self) -> u64 {
802 self.num_rows
803 }
804
805 fn rows_loaded(&self) -> u64 {
806 self.rows_loaded
807 }
808
809 fn rows_drained(&self) -> u64 {
810 self.rows_drained
811 }
812
813 fn data_type(&self) -> &DataType {
814 &self.data_type
815 }
816}
817
818struct IndirectlyLoaded {
819 offsets: Arc<[u64]>,
820 validity: BooleanBuffer,
821 root_decoder: Option<SimpleStructDecoder>,
822}
823
824impl std::fmt::Debug for IndirectlyLoaded {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 f.debug_struct("IndirectlyLoaded")
827 .field("offsets", &self.offsets)
828 .field("validity", &self.validity)
829 .finish()
830 }
831}
832
833#[derive(Debug)]
863struct ListOffsetsEncoder {
864 accumulation_queue: AccumulationQueue,
866 inner_encoder: Arc<dyn ArrayEncoder>,
868 column_index: u32,
869}
870
871impl ListOffsetsEncoder {
872 fn new(
873 cache_bytes: u64,
874 keep_original_array: bool,
875 column_index: u32,
876 inner_encoder: Arc<dyn ArrayEncoder>,
877 ) -> Self {
878 Self {
879 accumulation_queue: AccumulationQueue::new(
880 cache_bytes,
881 column_index,
882 keep_original_array,
883 ),
884 inner_encoder,
885 column_index,
886 }
887 }
888
889 fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
891 match list_arr.data_type() {
892 DataType::List(_) => {
893 let offsets = list_arr.as_list::<i32>().offsets().clone();
894 Arc::new(Int32Array::new(offsets.into_inner(), None))
895 }
896 DataType::LargeList(_) => {
897 let offsets = list_arr.as_list::<i64>().offsets().clone();
898 Arc::new(Int64Array::new(offsets.into_inner(), None))
899 }
900 _ => panic!(),
901 }
902 }
903
904 fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
907 if let Some(validity) = list_arr.nulls() {
908 Arc::new(BooleanArray::new(validity.inner().clone(), None))
909 } else {
910 new_empty_array(&DataType::Boolean)
913 }
914 }
915
916 fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
917 let inner_encoder = self.inner_encoder.clone();
918 let column_idx = self.column_index;
919 let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
922 let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
923
924 tokio::task::spawn(async move {
925 let num_rows =
926 offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
927 let num_rows = num_rows as u64;
928 let mut buffer_index = 0;
929 let array = Self::do_encode(
930 offset_arrays,
931 validity_arrays,
932 &mut buffer_index,
933 num_rows,
934 inner_encoder,
935 )?;
936 let (data, description) = array.into_buffers();
937 Ok(EncodedPage {
938 data,
939 description: PageEncoding::Legacy(description),
940 num_rows,
941 column_idx,
942 row_number: 0, })
944 })
945 .map(|res_res| res_res.unwrap())
946 .boxed()
947 }
948
949 fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
950 let offsets = Self::extract_offsets(list_arr);
951 let validity = Self::extract_validity(list_arr);
952 let num_rows = offsets.len() as u64;
953 if let Some(mut arrays) = self
956 .accumulation_queue
957 .insert(offsets, 0, num_rows)
958 {
959 arrays.0.push(validity);
960 Some(self.make_encode_task(arrays.0))
961 } else if let Some(arrays) = self
962 .accumulation_queue
963 .insert(validity, 0, num_rows)
964 {
965 Some(self.make_encode_task(arrays.0))
966 } else {
967 None
968 }
969 }
970
971 fn flush(&mut self) -> Option<EncodeTask> {
972 if let Some(arrays) = self.accumulation_queue.flush() {
973 Some(self.make_encode_task(arrays.0))
974 } else {
975 None
976 }
977 }
978
979 fn get_offset_span(array: &dyn Array) -> u64 {
982 match array.data_type() {
983 DataType::Int32 => {
984 let arr_i32 = array.as_primitive::<Int32Type>();
985 (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
986 }
987 DataType::Int64 => {
988 let arr_i64 = array.as_primitive::<Int64Type>();
989 (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
990 }
991 _ => panic!(),
992 }
993 }
994
995 fn extend_offsets_vec_u64(
998 dest: &mut Vec<u64>,
999 offsets: &dyn Array,
1000 validity: Option<&BooleanArray>,
1001 base: u64,
1003 null_offset_adjustment: u64,
1004 ) {
1005 match offsets.data_type() {
1006 DataType::Int32 => {
1007 let offsets_i32 = offsets.as_primitive::<Int32Type>();
1008 let start = offsets_i32.value(0) as u64;
1009 let modifier = base as i64 - start as i64;
1013 if let Some(validity) = validity {
1014 dest.extend(
1015 offsets_i32
1016 .values()
1017 .iter()
1018 .skip(1)
1019 .zip(validity.values().iter())
1020 .map(|(&off, valid)| {
1021 (off as i64 + modifier) as u64
1022 + (!valid as u64 * null_offset_adjustment)
1023 }),
1024 );
1025 } else {
1026 dest.extend(
1027 offsets_i32
1028 .values()
1029 .iter()
1030 .skip(1)
1031 .map(|&v| (v as i64 + modifier) as u64),
1033 );
1034 }
1035 }
1036 DataType::Int64 => {
1037 let offsets_i64 = offsets.as_primitive::<Int64Type>();
1038 let start = offsets_i64.value(0) as u64;
1039 let modifier = base as i64 - start as i64;
1043 if let Some(validity) = validity {
1044 dest.extend(
1045 offsets_i64
1046 .values()
1047 .iter()
1048 .skip(1)
1049 .zip(validity.values().iter())
1050 .map(|(&off, valid)| {
1051 (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1052 }),
1053 )
1054 } else {
1055 dest.extend(
1056 offsets_i64
1057 .values()
1058 .iter()
1059 .skip(1)
1060 .map(|&v| (v + modifier) as u64),
1061 );
1062 }
1063 }
1064 _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1065 }
1066 }
1067
1068 fn do_encode_u64(
1069 offset_arrays: Vec<ArrayRef>,
1070 validity: Vec<Option<&BooleanArray>>,
1071 num_offsets: u64,
1072 null_offset_adjustment: u64,
1073 buffer_index: &mut u32,
1074 inner_encoder: Arc<dyn ArrayEncoder>,
1075 ) -> Result<EncodedArray> {
1076 let mut offsets = Vec::with_capacity(num_offsets as usize);
1077 for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1078 let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1079 Self::extend_offsets_vec_u64(
1080 &mut offsets,
1081 &offsets_arr,
1082 validity_arr,
1083 last_prev_offset,
1084 null_offset_adjustment,
1085 );
1086 }
1087 let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1088 bits_per_value: 64,
1089 data: LanceBuffer::reinterpret_vec(offsets),
1090 num_values: num_offsets,
1091 block_info: BlockInfo::new(),
1092 });
1093 inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1094 }
1095
1096 fn do_encode(
1097 offset_arrays: Vec<ArrayRef>,
1098 validity_arrays: Vec<ArrayRef>,
1099 buffer_index: &mut u32,
1100 num_offsets: u64,
1101 inner_encoder: Arc<dyn ArrayEncoder>,
1102 ) -> Result<EncodedArray> {
1103 let validity_arrays = validity_arrays
1104 .iter()
1105 .map(|v| {
1106 if v.is_empty() {
1107 None
1108 } else {
1109 Some(v.as_boolean())
1110 }
1111 })
1112 .collect::<Vec<_>>();
1113 debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1114 let total_span = offset_arrays
1115 .iter()
1116 .map(|arr| Self::get_offset_span(arr.as_ref()))
1117 .sum::<u64>();
1118 let null_offset_adjustment = total_span + 1;
1120 let encoded_offsets = Self::do_encode_u64(
1121 offset_arrays,
1122 validity_arrays,
1123 num_offsets,
1124 null_offset_adjustment,
1125 buffer_index,
1126 inner_encoder,
1127 )?;
1128 Ok(EncodedArray {
1129 data: encoded_offsets.data,
1130 encoding: pb::ArrayEncoding {
1131 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1132 pb::List {
1133 offsets: Some(Box::new(encoded_offsets.encoding)),
1134 null_offset_adjustment,
1135 num_items: total_span,
1136 },
1137 ))),
1138 },
1139 })
1140 }
1141}
1142
1143pub struct ListFieldEncoder {
1144 offsets_encoder: ListOffsetsEncoder,
1145 items_encoder: Box<dyn FieldEncoder>,
1146}
1147
1148impl ListFieldEncoder {
1149 pub fn new(
1150 items_encoder: Box<dyn FieldEncoder>,
1151 inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1152 cache_bytes_per_columns: u64,
1153 keep_original_array: bool,
1154 column_index: u32,
1155 ) -> Self {
1156 Self {
1157 offsets_encoder: ListOffsetsEncoder::new(
1158 cache_bytes_per_columns,
1159 keep_original_array,
1160 column_index,
1161 inner_offsets_encoder,
1162 ),
1163 items_encoder,
1164 }
1165 }
1166
1167 fn combine_tasks(
1168 offsets_tasks: Vec<EncodeTask>,
1169 item_tasks: Vec<EncodeTask>,
1170 ) -> Result<Vec<EncodeTask>> {
1171 let mut all_tasks = offsets_tasks;
1172 let item_tasks = item_tasks;
1173 all_tasks.extend(item_tasks);
1174 Ok(all_tasks)
1175 }
1176}
1177
1178impl FieldEncoder for ListFieldEncoder {
1179 fn maybe_encode(
1180 &mut self,
1181 array: ArrayRef,
1182 external_buffers: &mut OutOfLineBuffers,
1183 repdef: RepDefBuilder,
1184 row_number: u64,
1185 num_rows: u64,
1186 ) -> Result<Vec<EncodeTask>> {
1187 let items = match array.data_type() {
1191 DataType::List(_) => {
1192 let list_arr = array.as_list::<i32>();
1193 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1194 let items_end =
1195 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1196 list_arr
1197 .values()
1198 .slice(items_start, items_end - items_start)
1199 }
1200 DataType::LargeList(_) => {
1201 let list_arr = array.as_list::<i64>();
1202 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1203 let items_end =
1204 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1205 list_arr
1206 .values()
1207 .slice(items_start, items_end - items_start)
1208 }
1209 _ => panic!(),
1210 };
1211 let offsets_tasks = self
1212 .offsets_encoder
1213 .maybe_encode_offsets_and_validity(array.as_ref())
1214 .map(|task| vec![task])
1215 .unwrap_or_default();
1216 let mut item_tasks = self.items_encoder.maybe_encode(
1217 items,
1218 external_buffers,
1219 repdef,
1220 row_number,
1221 num_rows,
1222 )?;
1223 if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1224 item_tasks = self.items_encoder.flush(external_buffers)?;
1230 }
1231 Self::combine_tasks(offsets_tasks, item_tasks)
1232 }
1233
1234 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1235 let offsets_tasks = self
1236 .offsets_encoder
1237 .flush()
1238 .map(|task| vec![task])
1239 .unwrap_or_default();
1240 let item_tasks = self.items_encoder.flush(external_buffers)?;
1241 Self::combine_tasks(offsets_tasks, item_tasks)
1242 }
1243
1244 fn num_columns(&self) -> u32 {
1245 self.items_encoder.num_columns() + 1
1246 }
1247
1248 fn finish(
1249 &mut self,
1250 external_buffers: &mut OutOfLineBuffers,
1251 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1252 let inner_columns = self.items_encoder.finish(external_buffers);
1253 async move {
1254 let mut columns = vec![EncodedColumn::default()];
1255 let inner_columns = inner_columns.await?;
1256 columns.extend(inner_columns);
1257 Ok(columns)
1258 }
1259 .boxed()
1260 }
1261}
1262
1263pub struct ListStructuralEncoder {
1271 child: Box<dyn FieldEncoder>,
1272}
1273
1274impl ListStructuralEncoder {
1275 pub fn new(child: Box<dyn FieldEncoder>) -> Self {
1276 Self { child }
1277 }
1278}
1279
1280impl FieldEncoder for ListStructuralEncoder {
1281 fn maybe_encode(
1282 &mut self,
1283 array: ArrayRef,
1284 external_buffers: &mut OutOfLineBuffers,
1285 mut repdef: RepDefBuilder,
1286 row_number: u64,
1287 num_rows: u64,
1288 ) -> Result<Vec<EncodeTask>> {
1289 let values = if let Some(list_arr) = array.as_list_opt::<i32>() {
1290 let has_garbage_values =
1291 repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned());
1292 if has_garbage_values {
1293 list_arr.filter_garbage_nulls().trimmed_values()
1294 } else {
1295 list_arr.trimmed_values()
1296 }
1297 } else if let Some(list_arr) = array.as_list_opt::<i64>() {
1298 let has_garbage_values =
1299 repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned());
1300 if has_garbage_values {
1301 list_arr.filter_garbage_nulls().trimmed_values()
1302 } else {
1303 list_arr.trimmed_values()
1304 }
1305 } else {
1306 panic!("List encoder used for non-list data")
1307 };
1308 self.child
1309 .maybe_encode(values, external_buffers, repdef, row_number, num_rows)
1310 }
1311
1312 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1313 self.child.flush(external_buffers)
1314 }
1315
1316 fn num_columns(&self) -> u32 {
1317 self.child.num_columns()
1318 }
1319
1320 fn finish(
1321 &mut self,
1322 external_buffers: &mut OutOfLineBuffers,
1323 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1324 self.child.finish(external_buffers)
1325 }
1326}
1327
1328#[derive(Debug)]
1329pub struct StructuralListScheduler {
1330 child: Box<dyn StructuralFieldScheduler>,
1331}
1332
1333impl StructuralListScheduler {
1334 pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
1335 Self { child }
1336 }
1337}
1338
1339impl StructuralFieldScheduler for StructuralListScheduler {
1340 fn schedule_ranges<'a>(
1341 &'a self,
1342 ranges: &[Range<u64>],
1343 filter: &FilterExpression,
1344 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
1345 let child = self.child.schedule_ranges(ranges, filter)?;
1346
1347 Ok(Box::new(StructuralListSchedulingJob::new(child)))
1348 }
1349
1350 fn initialize<'a>(
1351 &'a mut self,
1352 filter: &'a FilterExpression,
1353 context: &'a SchedulerContext,
1354 ) -> BoxFuture<'a, Result<()>> {
1355 self.child.initialize(filter, context)
1356 }
1357}
1358
1359#[derive(Debug)]
1364struct StructuralListSchedulingJob<'a> {
1365 child: Box<dyn StructuralSchedulingJob + 'a>,
1366}
1367
1368impl<'a> StructuralListSchedulingJob<'a> {
1369 fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
1370 Self { child }
1371 }
1372}
1373
1374impl StructuralSchedulingJob for StructuralListSchedulingJob<'_> {
1375 fn schedule_next(
1376 &mut self,
1377 context: &mut SchedulerContext,
1378 ) -> Result<Option<ScheduledScanLine>> {
1379 self.child.schedule_next(context)
1380 }
1381}
1382
1383#[derive(Debug)]
1384pub struct StructuralListDecoder {
1385 child: Box<dyn StructuralFieldDecoder>,
1386 data_type: DataType,
1387}
1388
1389impl StructuralListDecoder {
1390 pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
1391 Self { child, data_type }
1392 }
1393}
1394
1395impl StructuralFieldDecoder for StructuralListDecoder {
1396 fn accept_page(&mut self, child: crate::decoder::LoadedPage) -> Result<()> {
1397 self.child.accept_page(child)
1398 }
1399
1400 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
1401 let child_task = self.child.drain(num_rows)?;
1402 Ok(Box::new(StructuralListDecodeTask::new(
1403 child_task,
1404 self.data_type.clone(),
1405 )))
1406 }
1407
1408 fn data_type(&self) -> &DataType {
1409 &self.data_type
1410 }
1411}
1412
1413#[derive(Debug)]
1414struct StructuralListDecodeTask {
1415 child_task: Box<dyn StructuralDecodeArrayTask>,
1416 data_type: DataType,
1417}
1418
1419impl StructuralListDecodeTask {
1420 fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
1421 Self {
1422 child_task,
1423 data_type,
1424 }
1425 }
1426}
1427
1428impl StructuralDecodeArrayTask for StructuralListDecodeTask {
1429 fn decode(self: Box<Self>) -> Result<DecodedArray> {
1430 let DecodedArray { array, mut repdef } = self.child_task.decode()?;
1431 match &self.data_type {
1432 DataType::List(child_field) => {
1433 let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
1434 let list_array = ListArray::try_new(child_field.clone(), offsets, array, validity)?;
1435 Ok(DecodedArray {
1436 array: Arc::new(list_array),
1437 repdef,
1438 })
1439 }
1440 DataType::LargeList(child_field) => {
1441 let (offsets, validity) = repdef.unravel_offsets::<i64>()?;
1442 let list_array =
1443 LargeListArray::try_new(child_field.clone(), offsets, array, validity)?;
1444 Ok(DecodedArray {
1445 array: Arc::new(list_array),
1446 repdef,
1447 })
1448 }
1449 _ => panic!("List decoder did not have a list field"),
1450 }
1451 }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456
1457 use std::{collections::HashMap, sync::Arc};
1458
1459 use arrow::array::{Int64Builder, LargeListBuilder, StringBuilder};
1460 use arrow_array::{
1461 builder::{Int32Builder, ListBuilder},
1462 Array, ArrayRef, BooleanArray, DictionaryArray, LargeStringArray, ListArray, StructArray,
1463 UInt64Array, UInt8Array,
1464 };
1465 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
1466 use arrow_schema::{DataType, Field, Fields};
1467 use lance_core::datatypes::{
1468 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
1469 };
1470 use rstest::rstest;
1471
1472 use crate::{
1473 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1474 version::LanceFileVersion,
1475 };
1476
1477 fn make_list_type(inner_type: DataType) -> DataType {
1478 DataType::List(Arc::new(Field::new("item", inner_type, true)))
1479 }
1480
1481 fn make_large_list_type(inner_type: DataType) -> DataType {
1482 DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
1483 }
1484
1485 #[rstest]
1486 #[test_log::test(tokio::test)]
1487 async fn test_list(
1488 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1489 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1490 structural_encoding: &str,
1491 ) {
1492 let mut field_metadata = HashMap::new();
1493 field_metadata.insert(
1494 STRUCTURAL_ENCODING_META_KEY.to_string(),
1495 structural_encoding.into(),
1496 );
1497 let field =
1498 Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
1499 check_round_trip_encoding_random(field, version).await;
1500 }
1501
1502 #[test_log::test(tokio::test)]
1503 async fn test_large_list() {
1504 let field = Field::new("", make_large_list_type(DataType::Int32), true);
1505 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1506 }
1507
1508 #[test_log::test(tokio::test)]
1509 async fn test_nested_strings() {
1510 let field = Field::new("", make_list_type(DataType::Utf8), true);
1511 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1512 }
1513
1514 #[test_log::test(tokio::test)]
1515 async fn test_nested_list() {
1516 let field = Field::new("", make_list_type(make_list_type(DataType::Int32)), true);
1517 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1518 }
1519
1520 #[test_log::test(tokio::test)]
1521 async fn test_list_struct_list() {
1522 let struct_type = DataType::Struct(Fields::from(vec![Field::new(
1523 "inner_str",
1524 DataType::Utf8,
1525 false,
1526 )]));
1527
1528 let field = Field::new("", make_list_type(struct_type), true);
1529 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1530 }
1531
1532 #[test_log::test(tokio::test)]
1533 async fn test_list_struct_empty() {
1534 let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
1535 let items = UInt64Array::from(Vec::<u64>::new());
1536 let structs = StructArray::new(fields, vec![Arc::new(items)], None);
1537 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
1538 let lists = ListArray::new(
1539 Arc::new(Field::new("item", structs.data_type().clone(), true)),
1540 offsets,
1541 Arc::new(structs),
1542 None,
1543 );
1544
1545 check_round_trip_encoding_of_data(
1546 vec![Arc::new(lists)],
1547 &TestCases::default(),
1548 HashMap::new(),
1549 )
1550 .await;
1551 }
1552
1553 #[rstest]
1554 #[test_log::test(tokio::test)]
1555 async fn test_simple_list(
1556 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1557 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1558 structural_encoding: &str,
1559 ) {
1560 let items_builder = Int32Builder::new();
1561 let mut list_builder = ListBuilder::new(items_builder);
1562 list_builder.append_value([Some(1), Some(2), Some(3)]);
1563 list_builder.append_value([Some(4), Some(5)]);
1564 list_builder.append_null();
1565 list_builder.append_value([Some(6), Some(7), Some(8)]);
1566 let list_array = list_builder.finish();
1567
1568 let mut field_metadata = HashMap::new();
1569 field_metadata.insert(
1570 STRUCTURAL_ENCODING_META_KEY.to_string(),
1571 structural_encoding.into(),
1572 );
1573
1574 let test_cases = TestCases::default()
1575 .with_range(0..2)
1576 .with_range(0..3)
1577 .with_range(1..3)
1578 .with_indices(vec![1, 3])
1579 .with_indices(vec![2])
1580 .with_file_version(version);
1581 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1582 .await;
1583 }
1584
1585 #[rstest]
1586 #[test_log::test(tokio::test)]
1587 async fn test_simple_string_list(
1588 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1589 structural_encoding: &str,
1590 ) {
1591 let items_builder = StringBuilder::new();
1592 let mut list_builder = ListBuilder::new(items_builder);
1593 list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
1594 list_builder.append_value([Some("gh"), None]);
1595 list_builder.append_null();
1596 list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
1597 let list_array = list_builder.finish();
1598
1599 let mut field_metadata = HashMap::new();
1600 field_metadata.insert(
1601 STRUCTURAL_ENCODING_META_KEY.to_string(),
1602 structural_encoding.into(),
1603 );
1604
1605 let test_cases = TestCases::default()
1606 .with_range(0..2)
1607 .with_range(0..3)
1608 .with_range(1..3)
1609 .with_indices(vec![1, 3])
1610 .with_indices(vec![2])
1611 .with_file_version(LanceFileVersion::V2_1);
1612 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1613 .await;
1614 }
1615
1616 #[rstest]
1617 #[test_log::test(tokio::test)]
1618 async fn test_simple_sliced_list(
1619 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1620 structural_encoding: &str,
1621 ) {
1622 let items_builder = Int32Builder::new();
1623 let mut list_builder = ListBuilder::new(items_builder);
1624 list_builder.append_value([Some(1), Some(2), Some(3)]);
1625 list_builder.append_value([Some(4), Some(5)]);
1626 list_builder.append_null();
1627 list_builder.append_value([Some(6), Some(7), Some(8)]);
1628 let list_array = list_builder.finish();
1629
1630 let list_array = list_array.slice(1, 2);
1631
1632 let mut field_metadata = HashMap::new();
1633 field_metadata.insert(
1634 STRUCTURAL_ENCODING_META_KEY.to_string(),
1635 structural_encoding.into(),
1636 );
1637
1638 let test_cases = TestCases::default()
1639 .with_range(0..2)
1640 .with_range(1..2)
1641 .with_indices(vec![0])
1642 .with_indices(vec![1])
1643 .with_file_version(LanceFileVersion::V2_1);
1644 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1645 .await;
1646 }
1647
1648 #[test_log::test(tokio::test)]
1649 async fn test_simple_list_dict() {
1650 let values = LargeStringArray::from_iter_values(["a", "bb", "ccc"]);
1651 let indices = UInt8Array::from(vec![0, 1, 2, 0, 1, 2, 0, 1, 2]);
1652 let dict_array = DictionaryArray::new(indices, Arc::new(values));
1653 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 5, 6, 9]));
1654 let list_array = ListArray::new(
1655 Arc::new(Field::new("item", dict_array.data_type().clone(), true)),
1656 offsets,
1657 Arc::new(dict_array),
1658 None,
1659 );
1660
1661 let test_cases = TestCases::default()
1662 .with_range(0..2)
1663 .with_range(1..3)
1664 .with_range(2..4)
1665 .with_indices(vec![1])
1666 .with_indices(vec![2]);
1667 check_round_trip_encoding_of_data(
1668 vec![Arc::new(list_array)],
1669 &test_cases,
1670 HashMap::default(),
1671 )
1672 .await;
1673 }
1674
1675 #[rstest]
1676 #[test_log::test(tokio::test)]
1677 async fn test_list_with_garbage_nulls(
1678 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1679 structural_encoding: &str,
1680 ) {
1681 let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1684 let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
1685 let offsets = OffsetBuffer::new(offsets);
1686 let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
1687 let list_arr = ListArray::new(
1688 Arc::new(Field::new("item", DataType::UInt64, true)),
1689 offsets,
1690 Arc::new(items),
1691 Some(list_validity),
1692 );
1693
1694 let mut field_metadata = HashMap::new();
1695 field_metadata.insert(
1696 STRUCTURAL_ENCODING_META_KEY.to_string(),
1697 structural_encoding.into(),
1698 );
1699
1700 let test_cases = TestCases::default()
1701 .with_range(0..3)
1702 .with_range(1..2)
1703 .with_indices(vec![1])
1704 .with_indices(vec![2])
1705 .with_file_version(LanceFileVersion::V2_1);
1706 check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, field_metadata)
1707 .await;
1708 }
1709
1710 #[rstest]
1711 #[test_log::test(tokio::test)]
1712 async fn test_simple_two_page_list(
1713 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1714 structural_encoding: &str,
1715 ) {
1716 let items_builder = Int64Builder::new();
1719 let mut list_builder = ListBuilder::new(items_builder);
1720 for i in 0..512 {
1721 list_builder.append_value([Some(i), Some(i * 2)]);
1722 }
1723 let list_array_1 = list_builder.finish();
1724
1725 let items_builder = Int64Builder::new();
1726 let mut list_builder = ListBuilder::new(items_builder);
1727 for i in 0..512 {
1728 let i = i + 512;
1729 list_builder.append_value([Some(i), Some(i * 2)]);
1730 }
1731 let list_array_2 = list_builder.finish();
1732
1733 let mut metadata = HashMap::new();
1734 metadata.insert(
1735 STRUCTURAL_ENCODING_META_KEY.to_string(),
1736 structural_encoding.into(),
1737 );
1738
1739 let test_cases = TestCases::default()
1740 .with_file_version(LanceFileVersion::V2_1)
1741 .with_page_sizes(vec![100])
1742 .with_range(800..900);
1743 check_round_trip_encoding_of_data(
1744 vec![Arc::new(list_array_1), Arc::new(list_array_2)],
1745 &test_cases,
1746 metadata,
1747 )
1748 .await;
1749 }
1750
1751 #[test_log::test(tokio::test)]
1752 async fn test_simple_large_list() {
1753 let items_builder = Int32Builder::new();
1754 let mut list_builder = LargeListBuilder::new(items_builder);
1755 list_builder.append_value([Some(1), Some(2), Some(3)]);
1756 list_builder.append_value([Some(4), Some(5)]);
1757 list_builder.append_null();
1758 list_builder.append_value([Some(6), Some(7), Some(8)]);
1759 let list_array = list_builder.finish();
1760
1761 let test_cases = TestCases::default()
1762 .with_range(0..2)
1763 .with_range(0..3)
1764 .with_range(1..3)
1765 .with_indices(vec![1, 3]);
1766 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1767 .await;
1768 }
1769
1770 #[test_log::test(tokio::test)]
1771 async fn test_empty_lists() {
1772 let values = [vec![Some(1), Some(2), Some(3)], vec![], vec![None]];
1775 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
1777 let items_builder = Int32Builder::new();
1778 let mut list_builder = ListBuilder::new(items_builder);
1779 for idx in order {
1780 list_builder.append_value(values[idx].clone());
1781 }
1782 let list_array = Arc::new(list_builder.finish());
1783 let test_cases = TestCases::default()
1784 .with_indices(vec![1])
1785 .with_indices(vec![0])
1786 .with_indices(vec![2])
1787 .with_indices(vec![0, 1]);
1788 check_round_trip_encoding_of_data(
1789 vec![list_array.clone()],
1790 &test_cases,
1791 HashMap::new(),
1792 )
1793 .await;
1794 let test_cases = test_cases.with_batch_size(1);
1795 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1796 }
1797
1798 let items_builder = Int32Builder::new();
1803 let mut list_builder = ListBuilder::new(items_builder);
1804 list_builder.append(true);
1805 list_builder.append_null();
1806 list_builder.append(true);
1807 let list_array = Arc::new(list_builder.finish());
1808
1809 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1810 check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
1811 .await;
1812 let test_cases = test_cases.with_batch_size(1);
1813 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1814
1815 let items_builder = StringBuilder::new();
1820 let mut list_builder = ListBuilder::new(items_builder);
1821 list_builder.append(true);
1822 list_builder.append_null();
1823 list_builder.append(true);
1824 let list_array = Arc::new(list_builder.finish());
1825
1826 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1827 check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
1828 .await;
1829 let test_cases = test_cases.with_batch_size(1);
1830 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1831 }
1832
1833 #[test_log::test(tokio::test)]
1834 #[ignore] async fn test_jumbo_list() {
1836 let items = BooleanArray::new_null(1024 * 1024);
1840 let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1024 * 1024]));
1841 let list_arr = Arc::new(ListArray::new(
1842 Arc::new(Field::new("item", DataType::Boolean, true)),
1843 offsets,
1844 Arc::new(items),
1845 None,
1846 )) as ArrayRef;
1847 let arrs = vec![list_arr; 5000];
1848
1849 let test_cases = TestCases::default().without_validation();
1851 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
1852 }
1853}