1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
11use arrow_schema::{DataType, Field, Fields};
12use futures::{
13 future::BoxFuture,
14 stream::{FuturesOrdered, FuturesUnordered},
15 FutureExt, StreamExt, TryStreamExt,
16};
17use itertools::Itertools;
18use lance_arrow::FieldExt;
19use log::trace;
20use snafu::location;
21
22use crate::{
23 decoder::{
24 DecodeArrayTask, DecodedArray, DecoderReady, FieldScheduler, FilterExpression, LoadedPage,
25 LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding, PriorityRange,
26 ScheduledScanLine, SchedulerContext, SchedulingJob, StructuralDecodeArrayTask,
27 StructuralFieldDecoder, StructuralFieldScheduler, StructuralSchedulingJob,
28 },
29 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
30 format::pb,
31 repdef::RepDefBuilder,
32};
33use lance_core::{Error, Result};
34
35use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
36
37#[derive(Debug)]
38struct SchedulingJobWithStatus<'a> {
39 col_idx: u32,
40 col_name: &'a str,
41 job: Box<dyn SchedulingJob + 'a>,
42 rows_scheduled: u64,
43 rows_remaining: u64,
44}
45
46impl PartialEq for SchedulingJobWithStatus<'_> {
47 fn eq(&self, other: &Self) -> bool {
48 self.col_idx == other.col_idx
49 }
50}
51
52impl Eq for SchedulingJobWithStatus<'_> {}
53
54impl PartialOrd for SchedulingJobWithStatus<'_> {
55 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56 Some(self.cmp(other))
57 }
58}
59
60impl Ord for SchedulingJobWithStatus<'_> {
61 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
62 other.rows_scheduled.cmp(&self.rows_scheduled)
64 }
65}
66
67#[derive(Debug)]
68struct EmptyStructDecodeTask {
69 num_rows: u64,
70}
71
72impl DecodeArrayTask for EmptyStructDecodeTask {
73 fn decode(self: Box<Self>) -> Result<ArrayRef> {
74 Ok(Arc::new(StructArray::new_empty_fields(
75 self.num_rows as usize,
76 None,
77 )))
78 }
79}
80
81#[derive(Debug)]
82struct EmptyStructDecoder {
83 num_rows: u64,
84 rows_drained: u64,
85 data_type: DataType,
86}
87
88impl EmptyStructDecoder {
89 fn new(num_rows: u64) -> Self {
90 Self {
91 num_rows,
92 rows_drained: 0,
93 data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
94 }
95 }
96}
97
98impl LogicalPageDecoder for EmptyStructDecoder {
99 fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<Result<()>> {
100 Box::pin(std::future::ready(Ok(())))
101 }
102 fn rows_loaded(&self) -> u64 {
103 self.num_rows
104 }
105 fn rows_unloaded(&self) -> u64 {
106 0
107 }
108 fn num_rows(&self) -> u64 {
109 self.num_rows
110 }
111 fn rows_drained(&self) -> u64 {
112 self.rows_drained
113 }
114 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
115 self.rows_drained += num_rows;
116 Ok(NextDecodeTask {
117 num_rows,
118 task: Box::new(EmptyStructDecodeTask { num_rows }),
119 })
120 }
121 fn data_type(&self) -> &DataType {
122 &self.data_type
123 }
124}
125
126#[derive(Debug)]
127struct EmptyStructSchedulerJob {
128 num_rows: u64,
129}
130
131impl SchedulingJob for EmptyStructSchedulerJob {
132 fn schedule_next(
133 &mut self,
134 context: &mut SchedulerContext,
135 _priority: &dyn PriorityRange,
136 ) -> Result<ScheduledScanLine> {
137 let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
138 let struct_decoder = context.locate_decoder(empty_decoder);
139 Ok(ScheduledScanLine {
140 decoders: vec![MessageType::DecoderReady(struct_decoder)],
141 rows_scheduled: self.num_rows,
142 })
143 }
144
145 fn num_rows(&self) -> u64 {
146 self.num_rows
147 }
148}
149
150#[derive(Debug)]
157struct SimpleStructSchedulerJob<'a> {
158 scheduler: &'a SimpleStructScheduler,
159 children: BinaryHeap<SchedulingJobWithStatus<'a>>,
161 rows_scheduled: u64,
162 num_rows: u64,
163 initialized: bool,
164}
165
166impl<'a> SimpleStructSchedulerJob<'a> {
167 fn new(
168 scheduler: &'a SimpleStructScheduler,
169 children: Vec<Box<dyn SchedulingJob + 'a>>,
170 num_rows: u64,
171 ) -> Self {
172 let children = children
173 .into_iter()
174 .enumerate()
175 .map(|(idx, job)| SchedulingJobWithStatus {
176 col_idx: idx as u32,
177 col_name: scheduler.child_fields[idx].name(),
178 job,
179 rows_scheduled: 0,
180 rows_remaining: num_rows,
181 })
182 .collect::<BinaryHeap<_>>();
183 Self {
184 scheduler,
185 children,
186 rows_scheduled: 0,
187 num_rows,
188 initialized: false,
189 }
190 }
191}
192
193impl SchedulingJob for SimpleStructSchedulerJob<'_> {
194 fn schedule_next(
195 &mut self,
196 mut context: &mut SchedulerContext,
197 priority: &dyn PriorityRange,
198 ) -> Result<ScheduledScanLine> {
199 let mut decoders = Vec::new();
200 if !self.initialized {
201 let struct_decoder = Box::new(SimpleStructDecoder::new(
204 self.scheduler.child_fields.clone(),
205 self.num_rows,
206 ));
207 let struct_decoder = context.locate_decoder(struct_decoder);
208 decoders.push(MessageType::DecoderReady(struct_decoder));
209 self.initialized = true;
210 }
211 let old_rows_scheduled = self.rows_scheduled;
212 while old_rows_scheduled == self.rows_scheduled {
215 let mut next_child = self.children.pop().unwrap();
216 trace!("Scheduling more rows for child {}", next_child.col_idx);
217 let scoped = context.push(next_child.col_name, next_child.col_idx);
218 let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
219 trace!(
220 "Scheduled {} rows for child {}",
221 child_scan.rows_scheduled,
222 next_child.col_idx
223 );
224 next_child.rows_scheduled += child_scan.rows_scheduled;
225 next_child.rows_remaining -= child_scan.rows_scheduled;
226 decoders.extend(child_scan.decoders);
227 self.children.push(next_child);
228 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
229 context = scoped.pop();
230 }
231 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
232 Ok(ScheduledScanLine {
233 decoders,
234 rows_scheduled: struct_rows_scheduled,
235 })
236 }
237
238 fn num_rows(&self) -> u64 {
239 self.num_rows
240 }
241}
242
243#[derive(Debug)]
254pub struct SimpleStructScheduler {
255 children: Vec<Arc<dyn FieldScheduler>>,
256 child_fields: Fields,
257 num_rows: u64,
258}
259
260impl SimpleStructScheduler {
261 pub fn new(
262 children: Vec<Arc<dyn FieldScheduler>>,
263 child_fields: Fields,
264 num_rows: u64,
265 ) -> Self {
266 let num_rows = children
267 .first()
268 .map(|child| child.num_rows())
269 .unwrap_or(num_rows);
270 debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
271 Self {
272 children,
273 child_fields,
274 num_rows,
275 }
276 }
277}
278
279impl FieldScheduler for SimpleStructScheduler {
280 fn schedule_ranges<'a>(
281 &'a self,
282 ranges: &[Range<u64>],
283 filter: &FilterExpression,
284 ) -> Result<Box<dyn SchedulingJob + 'a>> {
285 if self.children.is_empty() {
286 return Ok(Box::new(EmptyStructSchedulerJob {
287 num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
288 }));
289 }
290 let child_schedulers = self
291 .children
292 .iter()
293 .map(|child| child.schedule_ranges(ranges, filter))
294 .collect::<Result<Vec<_>>>()?;
295 let num_rows = child_schedulers[0].num_rows();
296 Ok(Box::new(SimpleStructSchedulerJob::new(
297 self,
298 child_schedulers,
299 num_rows,
300 )))
301 }
302
303 fn num_rows(&self) -> u64 {
304 self.num_rows
305 }
306
307 fn initialize<'a>(
308 &'a self,
309 _filter: &'a FilterExpression,
310 _context: &'a SchedulerContext,
311 ) -> BoxFuture<'a, Result<()>> {
312 let futures = self
313 .children
314 .iter()
315 .map(|child| child.initialize(_filter, _context))
316 .collect::<FuturesUnordered<_>>();
317 async move {
318 futures
319 .map(|res| res.map(|_| ()))
320 .try_collect::<Vec<_>>()
321 .await?;
322 Ok(())
323 }
324 .boxed()
325 }
326}
327
328#[derive(Debug)]
329struct StructuralSchedulingJobWithStatus<'a> {
330 col_idx: u32,
331 col_name: &'a str,
332 job: Box<dyn StructuralSchedulingJob + 'a>,
333 rows_scheduled: u64,
334 rows_remaining: u64,
335}
336
337impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
338 fn eq(&self, other: &Self) -> bool {
339 self.col_idx == other.col_idx
340 }
341}
342
343impl Eq for StructuralSchedulingJobWithStatus<'_> {}
344
345impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
346 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
347 Some(self.cmp(other))
348 }
349}
350
351impl Ord for StructuralSchedulingJobWithStatus<'_> {
352 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
353 other.rows_scheduled.cmp(&self.rows_scheduled)
355 }
356}
357
358#[derive(Debug)]
365struct RepDefStructSchedulingJob<'a> {
366 children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
368 rows_scheduled: u64,
369}
370
371impl<'a> RepDefStructSchedulingJob<'a> {
372 fn new(
373 scheduler: &'a StructuralStructScheduler,
374 children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
375 num_rows: u64,
376 ) -> Self {
377 let children = children
378 .into_iter()
379 .enumerate()
380 .map(|(idx, job)| StructuralSchedulingJobWithStatus {
381 col_idx: idx as u32,
382 col_name: scheduler.child_fields[idx].name(),
383 job,
384 rows_scheduled: 0,
385 rows_remaining: num_rows,
386 })
387 .collect::<BinaryHeap<_>>();
388 Self {
389 children,
390 rows_scheduled: 0,
391 }
392 }
393}
394
395impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
396 fn schedule_next(
397 &mut self,
398 mut context: &mut SchedulerContext,
399 ) -> Result<Option<ScheduledScanLine>> {
400 let mut decoders = Vec::new();
401 let old_rows_scheduled = self.rows_scheduled;
402 while old_rows_scheduled == self.rows_scheduled {
405 let mut next_child = self.children.pop().unwrap();
406 let scoped = context.push(next_child.col_name, next_child.col_idx);
407 let child_scan = next_child.job.schedule_next(scoped.context)?;
408 if child_scan.is_none() {
411 return Ok(None);
412 }
413 let child_scan = child_scan.unwrap();
414
415 trace!(
416 "Scheduled {} rows for child {}",
417 child_scan.rows_scheduled,
418 next_child.col_idx
419 );
420 next_child.rows_scheduled += child_scan.rows_scheduled;
421 next_child.rows_remaining -= child_scan.rows_scheduled;
422 decoders.extend(child_scan.decoders);
423 self.children.push(next_child);
424 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
425 context = scoped.pop();
426 }
427 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
428 Ok(Some(ScheduledScanLine {
429 decoders,
430 rows_scheduled: struct_rows_scheduled,
431 }))
432 }
433}
434
435#[derive(Debug)]
446pub struct StructuralStructScheduler {
447 children: Vec<Box<dyn StructuralFieldScheduler>>,
448 child_fields: Fields,
449}
450
451impl StructuralStructScheduler {
452 pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
453 debug_assert!(!children.is_empty());
454 Self {
455 children,
456 child_fields,
457 }
458 }
459}
460
461impl StructuralFieldScheduler for StructuralStructScheduler {
462 fn schedule_ranges<'a>(
463 &'a self,
464 ranges: &[Range<u64>],
465 filter: &FilterExpression,
466 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
467 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
468
469 let child_schedulers = self
470 .children
471 .iter()
472 .map(|child| child.schedule_ranges(ranges, filter))
473 .collect::<Result<Vec<_>>>()?;
474
475 Ok(Box::new(RepDefStructSchedulingJob::new(
476 self,
477 child_schedulers,
478 num_rows,
479 )))
480 }
481
482 fn initialize<'a>(
483 &'a mut self,
484 filter: &'a FilterExpression,
485 context: &'a SchedulerContext,
486 ) -> BoxFuture<'a, Result<()>> {
487 let children_initialization = self
488 .children
489 .iter_mut()
490 .map(|child| child.initialize(filter, context))
491 .collect::<FuturesUnordered<_>>();
492 async move {
493 children_initialization
494 .map(|res| res.map(|_| ()))
495 .try_collect::<Vec<_>>()
496 .await?;
497 Ok(())
498 }
499 .boxed()
500 }
501}
502
503#[derive(Debug)]
504struct ChildState {
505 scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
516 rows_loaded: u64,
518 rows_drained: u64,
520 rows_popped: u64,
522 num_rows: u64,
524 field_index: u32,
526}
527
528struct CompositeDecodeTask {
529 tasks: Vec<Box<dyn DecodeArrayTask>>,
531 num_rows: u64,
532 has_more: bool,
533}
534
535impl CompositeDecodeTask {
536 fn decode(self) -> Result<ArrayRef> {
537 let arrays = self
538 .tasks
539 .into_iter()
540 .map(|task| task.decode())
541 .collect::<Result<Vec<_>>>()?;
542 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
543 Ok(arrow_select::concat::concat(&array_refs)?)
550 }
551}
552
553impl ChildState {
554 fn new(num_rows: u64, field_index: u32) -> Self {
555 Self {
556 scheduled: VecDeque::new(),
557 rows_loaded: 0,
558 rows_drained: 0,
559 rows_popped: 0,
560 num_rows,
561 field_index,
562 }
563 }
564
565 async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
570 trace!(
571 "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
572 self.field_index,
573 loaded_need,
574 self.rows_loaded,
575 );
576 let mut fully_loaded = self.rows_popped;
577 for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
578 if next_decoder.rows_unloaded() > 0 {
579 let mut current_need = loaded_need;
580 current_need -= fully_loaded;
581 let rows_in_page = next_decoder.num_rows();
582 let need_for_page = (rows_in_page - 1).min(current_need);
583 trace!(
584 "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
585 self.field_index,
586 page_idx,
587 need_for_page,
588 rows_in_page,
589 );
590 next_decoder.wait_for_loaded(need_for_page).await?;
596 let now_loaded = next_decoder.rows_loaded();
597 fully_loaded += now_loaded;
598 trace!(
599 "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
600 self.field_index,
601 page_idx,
602 now_loaded,
603 fully_loaded
604 );
605 } else {
606 fully_loaded += next_decoder.num_rows();
607 }
608 if fully_loaded > loaded_need {
609 break;
610 }
611 }
612 self.rows_loaded = fully_loaded;
613 trace!(
614 "Struct child {} loaded {} new rows and now {} are loaded",
615 self.field_index,
616 fully_loaded,
617 self.rows_loaded
618 );
619 Ok(())
620 }
621
622 fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
623 trace!("Struct draining {} rows", num_rows);
624
625 trace!(
626 "Draining {} rows from struct page with {} rows already drained",
627 num_rows,
628 self.rows_drained
629 );
630 let mut remaining = num_rows;
631 let mut composite = CompositeDecodeTask {
632 tasks: Vec::new(),
633 num_rows: 0,
634 has_more: true,
635 };
636 while remaining > 0 {
637 let next = self.scheduled.front_mut().unwrap();
638 let rows_to_take = remaining.min(next.rows_left());
639 let next_task = next.drain(rows_to_take)?;
640 if next.rows_left() == 0 {
641 trace!("Completely drained page");
642 self.rows_popped += next.num_rows();
643 self.scheduled.pop_front();
644 }
645 remaining -= rows_to_take;
646 composite.tasks.push(next_task.task);
647 composite.num_rows += next_task.num_rows;
648 }
649 self.rows_drained += num_rows;
650 composite.has_more = self.rows_drained != self.num_rows;
651 Ok(composite)
652 }
653}
654
655struct WaitOrder<'a>(&'a mut ChildState);
657
658impl Eq for WaitOrder<'_> {}
659impl PartialEq for WaitOrder<'_> {
660 fn eq(&self, other: &Self) -> bool {
661 self.0.rows_loaded == other.0.rows_loaded
662 }
663}
664impl Ord for WaitOrder<'_> {
665 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
666 other.0.rows_loaded.cmp(&self.0.rows_loaded)
668 }
669}
670impl PartialOrd for WaitOrder<'_> {
671 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
672 Some(self.cmp(other))
673 }
674}
675
676#[derive(Debug)]
677pub struct StructuralStructDecoder {
678 children: Vec<Box<dyn StructuralFieldDecoder>>,
679 data_type: DataType,
680 child_fields: Fields,
681 is_root: bool,
683}
684
685impl StructuralStructDecoder {
686 pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
687 let children = fields
688 .iter()
689 .map(|field| Self::field_to_decoder(field, should_validate))
690 .collect();
691 let data_type = DataType::Struct(fields.clone());
692 Self {
693 data_type,
694 children,
695 child_fields: fields,
696 is_root,
697 }
698 }
699
700 fn field_to_decoder(
701 field: &Arc<arrow_schema::Field>,
702 should_validate: bool,
703 ) -> Box<dyn StructuralFieldDecoder> {
704 match field.data_type() {
705 DataType::Struct(fields) => {
706 if field.is_packed_struct() {
707 let decoder =
708 StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
709 Box::new(decoder)
710 } else {
711 Box::new(Self::new(fields.clone(), should_validate, false))
712 }
713 }
714 DataType::List(child_field) | DataType::LargeList(child_field) => {
715 let child_decoder = Self::field_to_decoder(child_field, should_validate);
716 Box::new(StructuralListDecoder::new(
717 child_decoder,
718 field.data_type().clone(),
719 ))
720 }
721 DataType::RunEndEncoded(_, _) => todo!(),
722 DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
723 DataType::Map(_, _) => todo!(),
724 DataType::Union(_, _) => todo!(),
725 _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
726 }
727 }
728
729 pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
730 let array_drain = self.drain(num_rows)?;
731 Ok(NextDecodeTask {
732 num_rows,
733 task: Box::new(array_drain),
734 })
735 }
736}
737
738impl StructuralFieldDecoder for StructuralStructDecoder {
739 fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
740 let child_idx = child.path.pop_front().unwrap();
742 self.children[child_idx as usize].accept_page(child)?;
744 Ok(())
745 }
746
747 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
748 let child_tasks = self
749 .children
750 .iter_mut()
751 .map(|child| child.drain(num_rows))
752 .collect::<Result<Vec<_>>>()?;
753 Ok(Box::new(RepDefStructDecodeTask {
754 children: child_tasks,
755 child_fields: self.child_fields.clone(),
756 is_root: self.is_root,
757 }))
758 }
759
760 fn data_type(&self) -> &DataType {
761 &self.data_type
762 }
763}
764
765#[derive(Debug)]
766struct RepDefStructDecodeTask {
767 children: Vec<Box<dyn StructuralDecodeArrayTask>>,
768 child_fields: Fields,
769 is_root: bool,
770}
771
772impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
773 fn decode(self: Box<Self>) -> Result<DecodedArray> {
774 let arrays = self
775 .children
776 .into_iter()
777 .map(|task| task.decode())
778 .collect::<Result<Vec<_>>>()?;
779 let mut children = Vec::with_capacity(arrays.len());
780 let mut arrays_iter = arrays.into_iter();
781 let first_array = arrays_iter.next().unwrap();
782 let length = first_array.array.len();
783
784 let mut repdef = first_array.repdef;
786 children.push(first_array.array);
787
788 for array in arrays_iter {
789 debug_assert_eq!(length, array.array.len());
790 children.push(array.array);
791 }
792
793 let validity = if self.is_root {
794 None
795 } else {
796 repdef.unravel_validity(length)
797 };
798 let array = StructArray::new(self.child_fields, children, validity);
799 Ok(DecodedArray {
800 array: Arc::new(array),
801 repdef,
802 })
803 }
804}
805
806#[derive(Debug)]
807pub struct SimpleStructDecoder {
808 children: Vec<ChildState>,
809 child_fields: Fields,
810 data_type: DataType,
811 num_rows: u64,
812}
813
814impl SimpleStructDecoder {
815 pub fn new(child_fields: Fields, num_rows: u64) -> Self {
816 let data_type = DataType::Struct(child_fields.clone());
817 Self {
818 children: child_fields
819 .iter()
820 .enumerate()
821 .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
822 .collect(),
823 child_fields,
824 data_type,
825 num_rows,
826 }
827 }
828
829 async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
830 let mut wait_orders = self
831 .children
832 .iter_mut()
833 .filter_map(|child| {
834 if child.rows_loaded <= loaded_need {
835 Some(WaitOrder(child))
836 } else {
837 None
838 }
839 })
840 .collect::<BinaryHeap<_>>();
841 while !wait_orders.is_empty() {
842 let next_waiter = wait_orders.pop().unwrap();
843 let next_highest = wait_orders
844 .peek()
845 .map(|w| w.0.rows_loaded)
846 .unwrap_or(u64::MAX);
847 let limit = loaded_need.min(next_highest);
850 next_waiter.0.wait_for_loaded(limit).await?;
851 log::trace!(
852 "Struct child {} finished await pass and now {} are loaded",
853 next_waiter.0.field_index,
854 next_waiter.0.rows_loaded
855 );
856 if next_waiter.0.rows_loaded <= loaded_need {
857 wait_orders.push(next_waiter);
858 }
859 }
860 Ok(())
861 }
862}
863
864impl LogicalPageDecoder for SimpleStructDecoder {
865 fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
866 let child_idx = child.path.pop_front().unwrap();
868 if child.path.is_empty() {
869 self.children[child_idx as usize]
871 .scheduled
872 .push_back(child.decoder);
873 } else {
874 let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
876 intended.accept_child(child)?;
877 }
878 Ok(())
879 }
880
881 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
882 self.do_wait_for_loaded(loaded_need).boxed()
883 }
884
885 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
886 let child_tasks = self
887 .children
888 .iter_mut()
889 .map(|child| child.drain(num_rows))
890 .collect::<Result<Vec<_>>>()?;
891 let num_rows = child_tasks[0].num_rows;
892 debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
893 Ok(NextDecodeTask {
894 task: Box::new(SimpleStructDecodeTask {
895 children: child_tasks,
896 child_fields: self.child_fields.clone(),
897 }),
898 num_rows,
899 })
900 }
901
902 fn rows_loaded(&self) -> u64 {
903 self.children.iter().map(|c| c.rows_loaded).min().unwrap()
904 }
905
906 fn rows_drained(&self) -> u64 {
907 debug_assert!(self
909 .children
910 .iter()
911 .all(|c| c.rows_drained == self.children[0].rows_drained));
912 self.children[0].rows_drained
913 }
914
915 fn num_rows(&self) -> u64 {
916 self.num_rows
917 }
918
919 fn data_type(&self) -> &DataType {
920 &self.data_type
921 }
922}
923
924struct SimpleStructDecodeTask {
925 children: Vec<CompositeDecodeTask>,
926 child_fields: Fields,
927}
928
929impl DecodeArrayTask for SimpleStructDecodeTask {
930 fn decode(self: Box<Self>) -> Result<ArrayRef> {
931 let child_arrays = self
932 .children
933 .into_iter()
934 .map(|child| child.decode())
935 .collect::<Result<Vec<_>>>()?;
936 Ok(Arc::new(StructArray::try_new(
937 self.child_fields,
938 child_arrays,
939 None,
940 )?))
941 }
942}
943
944pub struct StructStructuralEncoder {
949 children: Vec<Box<dyn FieldEncoder>>,
950}
951
952impl StructStructuralEncoder {
953 pub fn new(children: Vec<Box<dyn FieldEncoder>>) -> Self {
954 Self { children }
955 }
956}
957
958impl FieldEncoder for StructStructuralEncoder {
959 fn maybe_encode(
960 &mut self,
961 array: ArrayRef,
962 external_buffers: &mut OutOfLineBuffers,
963 mut repdef: RepDefBuilder,
964 row_number: u64,
965 num_rows: u64,
966 ) -> Result<Vec<EncodeTask>> {
967 let struct_array = array.as_struct();
968 if let Some(validity) = struct_array.nulls() {
969 repdef.add_validity_bitmap(validity.clone());
970 } else {
971 repdef.add_no_null(struct_array.len());
972 }
973 let child_tasks = self
974 .children
975 .iter_mut()
976 .zip(struct_array.columns().iter())
977 .map(|(encoder, arr)| {
978 encoder.maybe_encode(
979 arr.clone(),
980 external_buffers,
981 repdef.clone(),
982 row_number,
983 num_rows,
984 )
985 })
986 .collect::<Result<Vec<_>>>()?;
987 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
988 }
989
990 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
991 self.children
992 .iter_mut()
993 .map(|encoder| encoder.flush(external_buffers))
994 .flatten_ok()
995 .collect::<Result<Vec<_>>>()
996 }
997
998 fn num_columns(&self) -> u32 {
999 self.children
1000 .iter()
1001 .map(|child| child.num_columns())
1002 .sum::<u32>()
1003 }
1004
1005 fn finish(
1006 &mut self,
1007 external_buffers: &mut OutOfLineBuffers,
1008 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1009 let mut child_columns = self
1010 .children
1011 .iter_mut()
1012 .map(|child| child.finish(external_buffers))
1013 .collect::<FuturesOrdered<_>>();
1014 async move {
1015 let mut encoded_columns = Vec::with_capacity(child_columns.len());
1016 while let Some(child_cols) = child_columns.next().await {
1017 encoded_columns.extend(child_cols?);
1018 }
1019 Ok(encoded_columns)
1020 }
1021 .boxed()
1022 }
1023}
1024
1025pub struct StructFieldEncoder {
1026 children: Vec<Box<dyn FieldEncoder>>,
1027 column_index: u32,
1028 num_rows_seen: u64,
1029}
1030
1031impl StructFieldEncoder {
1032 #[allow(dead_code)]
1033 pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
1034 Self {
1035 children,
1036 column_index,
1037 num_rows_seen: 0,
1038 }
1039 }
1040}
1041
1042impl FieldEncoder for StructFieldEncoder {
1043 fn maybe_encode(
1044 &mut self,
1045 array: ArrayRef,
1046 external_buffers: &mut OutOfLineBuffers,
1047 repdef: RepDefBuilder,
1048 row_number: u64,
1049 num_rows: u64,
1050 ) -> Result<Vec<EncodeTask>> {
1051 self.num_rows_seen += array.len() as u64;
1052 let struct_array = array.as_struct();
1053 let child_tasks = self
1054 .children
1055 .iter_mut()
1056 .zip(struct_array.columns().iter())
1057 .map(|(encoder, arr)| {
1058 encoder.maybe_encode(
1059 arr.clone(),
1060 external_buffers,
1061 repdef.clone(),
1062 row_number,
1063 num_rows,
1064 )
1065 })
1066 .collect::<Result<Vec<_>>>()?;
1067 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
1068 }
1069
1070 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1071 let child_tasks = self
1072 .children
1073 .iter_mut()
1074 .map(|encoder| encoder.flush(external_buffers))
1075 .collect::<Result<Vec<_>>>()?;
1076 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
1077 }
1078
1079 fn num_columns(&self) -> u32 {
1080 self.children
1081 .iter()
1082 .map(|child| child.num_columns())
1083 .sum::<u32>()
1084 + 1
1085 }
1086
1087 fn finish(
1088 &mut self,
1089 external_buffers: &mut OutOfLineBuffers,
1090 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1091 let mut child_columns = self
1092 .children
1093 .iter_mut()
1094 .map(|child| child.finish(external_buffers))
1095 .collect::<FuturesOrdered<_>>();
1096 let num_rows_seen = self.num_rows_seen;
1097 let column_index = self.column_index;
1098 async move {
1099 let mut columns = Vec::new();
1100 let mut header = EncodedColumn::default();
1102 header.final_pages.push(EncodedPage {
1103 data: Vec::new(),
1104 description: PageEncoding::Legacy(pb::ArrayEncoding {
1105 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1106 pb::SimpleStruct {},
1107 )),
1108 }),
1109 num_rows: num_rows_seen,
1110 column_idx: column_index,
1111 row_number: 0, });
1113 columns.push(header);
1114 while let Some(child_cols) = child_columns.next().await {
1116 columns.extend(child_cols?);
1117 }
1118 Ok(columns)
1119 }
1120 .boxed()
1121 }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126
1127 use std::{collections::HashMap, sync::Arc};
1128
1129 use arrow_array::{
1130 builder::{Int32Builder, ListBuilder},
1131 Array, ArrayRef, Int32Array, StructArray,
1132 };
1133 use arrow_buffer::NullBuffer;
1134 use arrow_schema::{DataType, Field, Fields};
1135
1136 use crate::{
1137 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1138 version::LanceFileVersion,
1139 };
1140
1141 #[test_log::test(tokio::test)]
1142 async fn test_simple_struct() {
1143 let data_type = DataType::Struct(Fields::from(vec![
1144 Field::new("a", DataType::Int32, false),
1145 Field::new("b", DataType::Int32, false),
1146 ]));
1147 let field = Field::new("", data_type, false);
1148 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1149 }
1150
1151 #[test_log::test(tokio::test)]
1152 async fn test_nullable_struct() {
1153 let inner_fields = Fields::from(vec![
1171 Field::new("x", DataType::Int32, false),
1172 Field::new("y", DataType::Int32, true),
1173 ]);
1174 let inner_struct = DataType::Struct(inner_fields.clone());
1175 let outer_fields = Fields::from(vec![
1176 Field::new("score", DataType::Int32, true),
1177 Field::new("location", inner_struct, true),
1178 ]);
1179
1180 let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
1181 let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
1182 let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
1183
1184 let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
1185 let locations = StructArray::new(
1186 inner_fields,
1187 vec![Arc::new(x_vals), Arc::new(y_vals)],
1188 Some(location_validity),
1189 );
1190
1191 let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
1192 let rows = StructArray::new(
1193 outer_fields,
1194 vec![Arc::new(scores), Arc::new(locations)],
1195 Some(rows_validity),
1196 );
1197
1198 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1199
1200 check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
1201 }
1202
1203 #[test_log::test(tokio::test)]
1204 async fn test_struct_list() {
1205 let data_type = DataType::Struct(Fields::from(vec![
1206 Field::new(
1207 "inner_list",
1208 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1209 true,
1210 ),
1211 Field::new("outer_int", DataType::Int32, true),
1212 ]));
1213 let field = Field::new("row", data_type, false);
1214 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1215 }
1216
1217 #[test_log::test(tokio::test)]
1218 async fn test_empty_struct() {
1219 let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
1222 let field = Field::new("row", data_type, false);
1223 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1224 }
1225
1226 #[test_log::test(tokio::test)]
1227 async fn test_complicated_struct() {
1228 let data_type = DataType::Struct(Fields::from(vec![
1229 Field::new("int", DataType::Int32, true),
1230 Field::new(
1231 "inner",
1232 DataType::Struct(Fields::from(vec![
1233 Field::new("inner_int", DataType::Int32, true),
1234 Field::new(
1235 "inner_list",
1236 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1237 true,
1238 ),
1239 ])),
1240 true,
1241 ),
1242 Field::new("outer_binary", DataType::Binary, true),
1243 ]));
1244 let field = Field::new("row", data_type, false);
1245 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1246 }
1247
1248 #[test_log::test(tokio::test)]
1249 async fn test_ragged_scheduling() {
1250 let items_builder = Int32Builder::new();
1254 let mut list_builder = ListBuilder::new(items_builder);
1255 for _ in 0..10000 {
1256 list_builder.append_null();
1257 }
1258 let list_array = Arc::new(list_builder.finish());
1259 let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
1260 let fields = vec![
1261 Field::new("", list_array.data_type().clone(), true),
1262 Field::new("", int_array.data_type().clone(), true),
1263 ];
1264 let struct_array = Arc::new(StructArray::new(
1265 Fields::from(fields),
1266 vec![list_array, int_array],
1267 None,
1268 )) as ArrayRef;
1269 let struct_arrays = (0..10000)
1270 .step_by(437)
1272 .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
1273 .collect::<Vec<_>>();
1274 check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
1275 .await;
1276 }
1277}