1use std::collections::VecDeque;
216use std::sync::Once;
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::{CapacityMode, FileMetadataCache};
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::buffer::LanceBuffer;
238use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
239use crate::encoder::{values_column_encoding, EncodedBatch};
240use crate::encodings::logical::binary::BinaryFieldScheduler;
241use crate::encodings::logical::blob::BlobFieldScheduler;
242use crate::encodings::logical::list::{
243 ListFieldScheduler, OffsetPageInfo, StructuralListScheduler,
244};
245use crate::encodings::logical::primitive::{
246 PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
247};
248use crate::encodings::logical::r#struct::{
249 SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
250};
251use crate::encodings::physical::binary::{
252 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, VariableDecoder,
253};
254use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
255use crate::encodings::physical::fsst::{FsstMiniBlockDecompressor, FsstPerValueDecompressor};
256use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
257use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
258use crate::encodings::physical::{ColumnBuffers, FileBuffers};
259use crate::format::pb::{self, column_encoding};
260use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
261use crate::version::LanceFileVersion;
262use crate::{BufferScheduler, EncodingsIo};
263
264const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
266
267#[derive(Debug)]
274pub enum PageEncoding {
275 Legacy(pb::ArrayEncoding),
276 Structural(pb::PageLayout),
277}
278
279impl PageEncoding {
280 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
281 match self {
282 Self::Legacy(enc) => enc,
283 Self::Structural(_) => panic!("Expected a legacy encoding"),
284 }
285 }
286
287 pub fn as_structural(&self) -> &pb::PageLayout {
288 match self {
289 Self::Structural(enc) => enc,
290 Self::Legacy(_) => panic!("Expected a structural encoding"),
291 }
292 }
293
294 pub fn is_structural(&self) -> bool {
295 matches!(self, Self::Structural(_))
296 }
297}
298
299#[derive(Debug)]
303pub struct PageInfo {
304 pub num_rows: u64,
306 pub priority: u64,
310 pub encoding: PageEncoding,
312 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
314}
315
316#[derive(Debug, Clone)]
320pub struct ColumnInfo {
321 pub index: u32,
323 pub page_infos: Arc<[PageInfo]>,
325 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
327 pub encoding: pb::ColumnEncoding,
328}
329
330impl ColumnInfo {
331 pub fn new(
333 index: u32,
334 page_infos: Arc<[PageInfo]>,
335 buffer_offsets_and_sizes: Vec<(u64, u64)>,
336 encoding: pb::ColumnEncoding,
337 ) -> Self {
338 Self {
339 index,
340 page_infos,
341 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
342 encoding,
343 }
344 }
345
346 pub fn is_structural(&self) -> bool {
347 self.page_infos
348 .first()
350 .map(|page| page.encoding.is_structural())
351 .unwrap_or(false)
352 }
353}
354
355enum RootScheduler {
356 Structural(Box<dyn StructuralFieldScheduler>),
357 Legacy(Arc<dyn FieldScheduler>),
358}
359
360impl RootScheduler {
361 fn as_legacy(&self) -> &Arc<dyn FieldScheduler> {
362 match self {
363 Self::Structural(_) => panic!("Expected a legacy scheduler"),
364 Self::Legacy(s) => s,
365 }
366 }
367
368 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
369 match self {
370 Self::Structural(s) => s.as_ref(),
371 Self::Legacy(_) => panic!("Expected a structural scheduler"),
372 }
373 }
374}
375
376pub struct DecodeBatchScheduler {
398 root_scheduler: RootScheduler,
399 pub root_fields: Fields,
400 cache: Arc<FileMetadataCache>,
401}
402
403pub struct ColumnInfoIter<'a> {
404 column_infos: Vec<Arc<ColumnInfo>>,
405 column_indices: &'a [u32],
406 column_info_pos: usize,
407 column_indices_pos: usize,
408}
409
410impl<'a> ColumnInfoIter<'a> {
411 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
412 let initial_pos = column_indices[0] as usize;
413 Self {
414 column_infos,
415 column_indices,
416 column_info_pos: initial_pos,
417 column_indices_pos: 0,
418 }
419 }
420
421 pub fn peek(&self) -> &Arc<ColumnInfo> {
422 &self.column_infos[self.column_info_pos]
423 }
424
425 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
426 let column_info = self.column_infos[self.column_info_pos].clone();
427 let transformed = transform(column_info);
428 self.column_infos[self.column_info_pos] = transformed;
429 }
430
431 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
432 self.next().ok_or_else(|| {
433 Error::invalid_input(
434 "there were more fields in the schema than provided column indices / infos",
435 location!(),
436 )
437 })
438 }
439
440 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
441 if self.column_info_pos < self.column_infos.len() {
442 let info = &self.column_infos[self.column_info_pos];
443 self.column_info_pos += 1;
444 Some(info)
445 } else {
446 None
447 }
448 }
449
450 pub(crate) fn next_top_level(&mut self) {
451 self.column_indices_pos += 1;
452 if self.column_indices_pos < self.column_indices.len() {
453 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
454 } else {
455 self.column_info_pos = self.column_infos.len();
456 }
457 }
458}
459
460pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
461 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
462}
463
464pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
465 fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock>;
467 fn bits_per_value(&self) -> u64;
471}
472
473pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
474 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
476}
477
478pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
479 fn decompress(&self, data: LanceBuffer) -> Result<DataBlock>;
480}
481
482pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
483 fn create_miniblock_decompressor(
484 &self,
485 description: &pb::ArrayEncoding,
486 ) -> Result<Box<dyn MiniBlockDecompressor>>;
487
488 fn create_fixed_per_value_decompressor(
489 &self,
490 description: &pb::ArrayEncoding,
491 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
492
493 fn create_variable_per_value_decompressor(
494 &self,
495 description: &pb::ArrayEncoding,
496 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
497
498 fn create_block_decompressor(
499 &self,
500 description: &pb::ArrayEncoding,
501 ) -> Result<Box<dyn BlockDecompressor>>;
502}
503
504#[derive(Debug, Default)]
505pub struct CoreDecompressorStrategy {}
506
507impl DecompressorStrategy for CoreDecompressorStrategy {
508 fn create_miniblock_decompressor(
509 &self,
510 description: &pb::ArrayEncoding,
511 ) -> Result<Box<dyn MiniBlockDecompressor>> {
512 match description.array_encoding.as_ref().unwrap() {
513 pb::array_encoding::ArrayEncoding::Flat(flat) => {
514 Ok(Box::new(ValueDecompressor::new(flat)))
515 }
516 pb::array_encoding::ArrayEncoding::Bitpack2(description) => {
517 Ok(Box::new(BitpackMiniBlockDecompressor::new(description)))
518 }
519 pb::array_encoding::ArrayEncoding::Variable(_) => {
520 Ok(Box::new(BinaryMiniBlockDecompressor::default()))
521 }
522 pb::array_encoding::ArrayEncoding::Fsst(description) => {
523 Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
524 }
525 pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
526 Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
527 description,
528 )))
529 }
530 _ => todo!(),
531 }
532 }
533
534 fn create_fixed_per_value_decompressor(
535 &self,
536 description: &pb::ArrayEncoding,
537 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
538 match description.array_encoding.as_ref().unwrap() {
539 pb::array_encoding::ArrayEncoding::Flat(flat) => {
540 Ok(Box::new(ValueDecompressor::new(flat)))
541 }
542 _ => todo!("fixed-per-value decompressor for {:?}", description),
543 }
544 }
545
546 fn create_variable_per_value_decompressor(
547 &self,
548 description: &pb::ArrayEncoding,
549 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
550 match *description.array_encoding.as_ref().unwrap() {
551 pb::array_encoding::ArrayEncoding::Variable(variable) => {
552 assert!(variable.bits_per_offset < u8::MAX as u32);
553 Ok(Box::new(VariableDecoder::default()))
554 }
555 pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
556 Ok(Box::new(FsstPerValueDecompressor::new(
557 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
558 Box::new(VariableDecoder::default()),
559 )))
560 }
561 _ => todo!("variable-per-value decompressor for {:?}", description),
562 }
563 }
564
565 fn create_block_decompressor(
566 &self,
567 description: &pb::ArrayEncoding,
568 ) -> Result<Box<dyn BlockDecompressor>> {
569 match description.array_encoding.as_ref().unwrap() {
570 pb::array_encoding::ArrayEncoding::Flat(flat) => {
571 Ok(Box::new(ValueDecompressor::new(flat)))
572 }
573 pb::array_encoding::ArrayEncoding::Constant(constant) => {
574 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
575 Ok(Box::new(ConstantDecompressor::new(
576 scalar,
577 constant.num_values,
578 )))
579 }
580 pb::array_encoding::ArrayEncoding::Variable(_) => {
581 Ok(Box::new(BinaryBlockDecompressor::default()))
582 }
583 _ => todo!(),
584 }
585 }
586}
587
588#[derive(Debug)]
590pub struct CoreFieldDecoderStrategy {
591 pub validate_data: bool,
592 pub decompressor_strategy: Arc<dyn DecompressorStrategy>,
593}
594
595impl Default for CoreFieldDecoderStrategy {
596 fn default() -> Self {
597 Self {
598 validate_data: false,
599 decompressor_strategy: Arc::new(CoreDecompressorStrategy {}),
600 }
601 }
602}
603
604impl CoreFieldDecoderStrategy {
605 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
608 let column_encoding = column_info
609 .encoding
610 .column_encoding
611 .as_ref()
612 .ok_or_else(|| {
613 Error::invalid_input(
614 format!(
615 "the column at index {} was missing a ColumnEncoding",
616 column_info.index
617 ),
618 location!(),
619 )
620 })?;
621 if matches!(
622 column_encoding,
623 pb::column_encoding::ColumnEncoding::Values(_)
624 ) {
625 Ok(())
626 } else {
627 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
628 }
629 }
630
631 fn is_primitive(data_type: &DataType) -> bool {
632 if data_type.is_primitive() {
633 true
634 } else {
635 match data_type {
636 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
638 DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
639 _ => false,
640 }
641 }
642 }
643
644 fn create_primitive_scheduler(
645 &self,
646 field: &Field,
647 column: &ColumnInfo,
648 buffers: FileBuffers,
649 ) -> Result<Box<dyn FieldScheduler>> {
650 Self::ensure_values_encoded(column, &field.name)?;
651 let column_buffers = ColumnBuffers {
653 file_buffers: buffers,
654 positions_and_sizes: &column.buffer_offsets_and_sizes,
655 };
656 Ok(Box::new(PrimitiveFieldScheduler::new(
657 column.index,
658 field.data_type(),
659 column.page_infos.clone(),
660 column_buffers,
661 self.validate_data,
662 )))
663 }
664
665 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
667 Self::ensure_values_encoded(column_info, field_name)?;
668 if column_info.page_infos.len() != 1 {
669 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
670 }
671 let encoding = &column_info.page_infos[0].encoding;
672 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
673 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
674 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
675 }
676 }
677
678 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
679 let encoding = &column_info.page_infos[0].encoding;
680 matches!(
681 encoding.as_legacy().array_encoding.as_ref().unwrap(),
682 pb::array_encoding::ArrayEncoding::PackedStruct(_)
683 )
684 }
685
686 fn create_list_scheduler(
687 &self,
688 list_field: &Field,
689 column_infos: &mut ColumnInfoIter,
690 buffers: FileBuffers,
691 offsets_column: &ColumnInfo,
692 ) -> Result<Box<dyn FieldScheduler>> {
693 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
694 let offsets_column_buffers = ColumnBuffers {
695 file_buffers: buffers,
696 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
697 };
698 let items_scheduler =
699 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
700
701 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
702 .page_infos
703 .iter()
704 .filter(|offsets_page| offsets_page.num_rows > 0)
705 .map(|offsets_page| {
706 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
707 &offsets_page.encoding.as_legacy().array_encoding
708 {
709 let inner = PageInfo {
710 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
711 encoding: PageEncoding::Legacy(
712 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
713 ),
714 num_rows: offsets_page.num_rows,
715 priority: 0,
716 };
717 (
718 inner,
719 OffsetPageInfo {
720 offsets_in_page: offsets_page.num_rows,
721 null_offset_adjustment: list_encoding.null_offset_adjustment,
722 num_items_referenced_by_page: list_encoding.num_items,
723 },
724 )
725 } else {
726 panic!("Expected a list column");
728 }
729 })
730 .unzip();
731 let inner = Arc::new(PrimitiveFieldScheduler::new(
732 offsets_column.index,
733 DataType::UInt64,
734 Arc::from(inner_infos.into_boxed_slice()),
735 offsets_column_buffers,
736 self.validate_data,
737 )) as Arc<dyn FieldScheduler>;
738 let items_field = match list_field.data_type() {
739 DataType::List(inner) => inner,
740 DataType::LargeList(inner) => inner,
741 _ => unreachable!(),
742 };
743 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
744 DataType::Int32
745 } else {
746 DataType::Int64
747 };
748 Ok(Box::new(ListFieldScheduler::new(
749 inner,
750 items_scheduler.into(),
751 items_field,
752 offset_type,
753 null_offset_adjustments,
754 )))
755 }
756
757 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
758 if let column_encoding::ColumnEncoding::Blob(blob) =
759 column_info.encoding.column_encoding.as_ref().unwrap()
760 {
761 let mut column_info = column_info.clone();
762 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
763 Some(column_info)
764 } else {
765 None
766 }
767 }
768
769 fn items_per_row(data_type: &DataType) -> u64 {
770 match data_type {
771 DataType::FixedSizeList(inner, dimension) => {
772 Self::items_per_row(inner.data_type()) * *dimension as u64
773 }
774 _ => 1,
775 }
776 }
777
778 fn create_structural_field_scheduler(
779 &self,
780 field: &Field,
781 column_infos: &mut ColumnInfoIter,
782 ) -> Result<Box<dyn StructuralFieldScheduler>> {
783 let data_type = field.data_type();
784 if Self::is_primitive(&data_type) {
785 let column_info = column_infos.expect_next()?;
786 let items_per_row = Self::items_per_row(&data_type);
787 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
788 column_info.as_ref(),
789 items_per_row,
790 self.decompressor_strategy.as_ref(),
791 )?);
792
793 column_infos.next_top_level();
795
796 return Ok(scheduler);
797 }
798 match &data_type {
799 DataType::Struct(fields) => {
800 if field.is_packed_struct() {
801 let column_info = column_infos.expect_next()?;
802 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
803 column_info.as_ref(),
804 1, self.decompressor_strategy.as_ref(),
806 )?);
807
808 column_infos.next_top_level();
810
811 return Ok(scheduler);
812 }
813 let mut child_schedulers = Vec::with_capacity(field.children.len());
814 for field in field.children.iter() {
815 let field_scheduler =
816 self.create_structural_field_scheduler(field, column_infos)?;
817 child_schedulers.push(field_scheduler);
818 }
819
820 let fields = fields.clone();
821 Ok(
822 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
823 as Box<dyn StructuralFieldScheduler>,
824 )
825 }
826 DataType::Binary | DataType::Utf8 => {
827 let column_info = column_infos.expect_next()?;
828 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
829 column_info.as_ref(),
830 1,
831 self.decompressor_strategy.as_ref(),
832 )?);
833 column_infos.next_top_level();
834 Ok(scheduler)
835 }
836 DataType::List(_) | DataType::LargeList(_) => {
837 let child = field
838 .children
839 .first()
840 .expect("List field must have a child");
841 let child_scheduler =
842 self.create_structural_field_scheduler(child, column_infos)?;
843 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
844 as Box<dyn StructuralFieldScheduler>)
845 }
846 _ => todo!(),
847 }
848 }
849
850 fn create_legacy_field_scheduler(
851 &self,
852 field: &Field,
853 column_infos: &mut ColumnInfoIter,
854 buffers: FileBuffers,
855 ) -> Result<Box<dyn FieldScheduler>> {
856 let data_type = field.data_type();
857 if Self::is_primitive(&data_type) {
858 let column_info = column_infos.expect_next()?;
859 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
860 return Ok(scheduler);
861 } else if data_type.is_binary_like() {
862 let column_info = column_infos.next().unwrap().clone();
863 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
865 let desc_scheduler =
866 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
867 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
868 return Ok(blob_scheduler);
869 }
870 if let Some(page_info) = column_info.page_infos.first() {
871 if matches!(
872 page_info.encoding.as_legacy(),
873 pb::ArrayEncoding {
874 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
875 }
876 ) {
877 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
878 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
879 } else {
880 DataType::LargeList(Arc::new(ArrowField::new(
881 "item",
882 DataType::UInt8,
883 false,
884 )))
885 };
886 let list_field = Field::try_from(ArrowField::new(
887 field.name.clone(),
888 list_type,
889 field.nullable,
890 ))
891 .unwrap();
892 let list_scheduler = self.create_list_scheduler(
893 &list_field,
894 column_infos,
895 buffers,
896 &column_info,
897 )?;
898 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
899 list_scheduler.into(),
900 field.data_type(),
901 ));
902 return Ok(binary_scheduler);
903 } else {
904 let scheduler =
905 self.create_primitive_scheduler(field, &column_info, buffers)?;
906 return Ok(scheduler);
907 }
908 } else {
909 return self.create_primitive_scheduler(field, &column_info, buffers);
910 }
911 }
912 match &data_type {
913 DataType::FixedSizeList(inner, _dimension) => {
914 if Self::is_primitive(inner.data_type()) {
917 let primitive_col = column_infos.expect_next()?;
918 let scheduler =
919 self.create_primitive_scheduler(field, primitive_col, buffers)?;
920 Ok(scheduler)
921 } else {
922 todo!()
923 }
924 }
925 DataType::Dictionary(_key_type, value_type) => {
926 if Self::is_primitive(value_type) || value_type.is_binary_like() {
927 let primitive_col = column_infos.expect_next()?;
928 let scheduler =
929 self.create_primitive_scheduler(field, primitive_col, buffers)?;
930 Ok(scheduler)
931 } else {
932 Err(Error::NotSupported {
933 source: format!(
934 "No way to decode into a dictionary field of type {}",
935 value_type
936 )
937 .into(),
938 location: location!(),
939 })
940 }
941 }
942 DataType::List(_) | DataType::LargeList(_) => {
943 let offsets_column = column_infos.expect_next()?.clone();
944 column_infos.next_top_level();
945 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
946 }
947 DataType::Struct(fields) => {
948 let column_info = column_infos.expect_next()?;
949
950 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
952 return self.create_primitive_scheduler(field, &blob_col, buffers);
954 }
955
956 if Self::check_packed_struct(column_info) {
957 self.create_primitive_scheduler(field, column_info, buffers)
959 } else {
960 Self::check_simple_struct(column_info, &field.name).unwrap();
962 let num_rows = column_info
963 .page_infos
964 .iter()
965 .map(|page| page.num_rows)
966 .sum();
967 let mut child_schedulers = Vec::with_capacity(field.children.len());
968 for field in &field.children {
969 column_infos.next_top_level();
970 let field_scheduler =
971 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
972 child_schedulers.push(Arc::from(field_scheduler));
973 }
974
975 let fields = fields.clone();
976 Ok(Box::new(SimpleStructScheduler::new(
977 child_schedulers,
978 fields,
979 num_rows,
980 )))
981 }
982 }
983 _ => todo!(),
985 }
986 }
987}
988
989fn root_column(num_rows: u64) -> ColumnInfo {
991 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
992 let final_page_num_rows = num_rows % (u32::MAX as u64);
993 let root_pages = (0..num_root_pages)
994 .map(|i| PageInfo {
995 num_rows: if i == num_root_pages - 1 {
996 final_page_num_rows
997 } else {
998 u64::MAX
999 },
1000 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
1001 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1002 pb::SimpleStruct {},
1003 )),
1004 }),
1005 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
1007 })
1008 .collect::<Vec<_>>();
1009 ColumnInfo {
1010 buffer_offsets_and_sizes: Arc::new([]),
1011 encoding: values_column_encoding(),
1012 index: u32::MAX,
1013 page_infos: Arc::from(root_pages),
1014 }
1015}
1016
1017pub enum RootDecoder {
1018 Structural(StructuralStructDecoder),
1019 Legacy(SimpleStructDecoder),
1020}
1021
1022impl RootDecoder {
1023 pub fn into_structural(self) -> StructuralStructDecoder {
1024 match self {
1025 Self::Structural(decoder) => decoder,
1026 Self::Legacy(_) => panic!("Expected a structural decoder"),
1027 }
1028 }
1029
1030 pub fn into_legacy(self) -> SimpleStructDecoder {
1031 match self {
1032 Self::Legacy(decoder) => decoder,
1033 Self::Structural(_) => panic!("Expected a legacy decoder"),
1034 }
1035 }
1036}
1037
1038impl DecodeBatchScheduler {
1039 #[allow(clippy::too_many_arguments)]
1042 pub async fn try_new<'a>(
1043 schema: &'a Schema,
1044 column_indices: &[u32],
1045 column_infos: &[Arc<ColumnInfo>],
1046 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1047 num_rows: u64,
1048 _decoder_plugins: Arc<DecoderPlugins>,
1049 io: Arc<dyn EncodingsIo>,
1050 cache: Arc<FileMetadataCache>,
1051 filter: &FilterExpression,
1052 ) -> Result<Self> {
1053 assert!(num_rows > 0);
1054 let buffers = FileBuffers {
1055 positions_and_sizes: file_buffer_positions_and_sizes,
1056 };
1057 let arrow_schema = ArrowSchema::from(schema);
1058 let root_fields = arrow_schema.fields().clone();
1059 let root_type = DataType::Struct(root_fields.clone());
1060 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1061 root_field.children.clone_from(&schema.fields);
1065 root_field
1066 .metadata
1067 .insert("__lance_decoder_root".to_string(), "true".to_string());
1068
1069 if column_infos[0].is_structural() {
1070 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1071
1072 let mut root_scheduler = CoreFieldDecoderStrategy::default()
1073 .create_structural_field_scheduler(&root_field, &mut column_iter)?;
1074
1075 let context = SchedulerContext::new(io, cache.clone());
1076 root_scheduler.initialize(filter, &context).await?;
1077
1078 Ok(Self {
1079 root_scheduler: RootScheduler::Structural(root_scheduler),
1080 root_fields,
1081 cache,
1082 })
1083 } else {
1084 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1087 columns.push(Arc::new(root_column(num_rows)));
1088 columns.extend(column_infos.iter().cloned());
1089
1090 let adjusted_column_indices = [0_u32]
1091 .into_iter()
1092 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1093 .collect::<Vec<_>>();
1094 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1095 let root_scheduler = CoreFieldDecoderStrategy::default()
1096 .create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1097
1098 let context = SchedulerContext::new(io, cache.clone());
1099 root_scheduler.initialize(filter, &context).await?;
1100
1101 Ok(Self {
1102 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1103 root_fields,
1104 cache,
1105 })
1106 }
1107 }
1108
1109 pub fn from_scheduler(
1110 root_scheduler: Arc<dyn FieldScheduler>,
1111 root_fields: Fields,
1112 cache: Arc<FileMetadataCache>,
1113 ) -> Self {
1114 Self {
1115 root_scheduler: RootScheduler::Legacy(root_scheduler),
1116 root_fields,
1117 cache,
1118 }
1119 }
1120
1121 fn do_schedule_ranges_structural(
1122 &mut self,
1123 ranges: &[Range<u64>],
1124 filter: &FilterExpression,
1125 io: Arc<dyn EncodingsIo>,
1126 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1127 ) {
1128 let root_scheduler = self.root_scheduler.as_structural();
1129 let mut context = SchedulerContext::new(io, self.cache.clone());
1130 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1131 if let Err(schedule_ranges_err) = maybe_root_job {
1132 schedule_action(Err(schedule_ranges_err));
1133 return;
1134 }
1135 let mut root_job = maybe_root_job.unwrap();
1136 let mut num_rows_scheduled = 0;
1137 loop {
1138 let maybe_next_scan_line = root_job.schedule_next(&mut context);
1139 if let Err(err) = maybe_next_scan_line {
1140 schedule_action(Err(err));
1141 return;
1142 }
1143 let next_scan_line = maybe_next_scan_line.unwrap();
1144 match next_scan_line {
1145 Some(next_scan_line) => {
1146 trace!(
1147 "Scheduled scan line of {} rows and {} decoders",
1148 next_scan_line.rows_scheduled,
1149 next_scan_line.decoders.len()
1150 );
1151 num_rows_scheduled += next_scan_line.rows_scheduled;
1152 if !schedule_action(Ok(DecoderMessage {
1153 scheduled_so_far: num_rows_scheduled,
1154 decoders: next_scan_line.decoders,
1155 })) {
1156 return;
1158 }
1159 }
1160 None => return,
1161 }
1162 }
1163 }
1164
1165 fn do_schedule_ranges_legacy(
1166 &mut self,
1167 ranges: &[Range<u64>],
1168 filter: &FilterExpression,
1169 io: Arc<dyn EncodingsIo>,
1170 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1171 priority: Option<Box<dyn PriorityRange>>,
1175 ) {
1176 let root_scheduler = self.root_scheduler.as_legacy();
1177 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1178 trace!(
1179 "Scheduling {} ranges across {}..{} ({} rows){}",
1180 ranges.len(),
1181 ranges.first().unwrap().start,
1182 ranges.last().unwrap().end,
1183 rows_requested,
1184 priority
1185 .as_ref()
1186 .map(|p| format!(" (priority={:?})", p))
1187 .unwrap_or_default()
1188 );
1189
1190 let mut context = SchedulerContext::new(io, self.cache.clone());
1191 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1192 if let Err(schedule_ranges_err) = maybe_root_job {
1193 schedule_action(Err(schedule_ranges_err));
1194 return;
1195 }
1196 let mut root_job = maybe_root_job.unwrap();
1197 let mut num_rows_scheduled = 0;
1198 let mut rows_to_schedule = root_job.num_rows();
1199 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1200 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1201 while rows_to_schedule > 0 {
1202 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1203 if let Err(schedule_next_err) = maybe_next_scan_line {
1204 schedule_action(Err(schedule_next_err));
1205 return;
1206 }
1207 let next_scan_line = maybe_next_scan_line.unwrap();
1208 priority.advance(next_scan_line.rows_scheduled);
1209 num_rows_scheduled += next_scan_line.rows_scheduled;
1210 rows_to_schedule -= next_scan_line.rows_scheduled;
1211 trace!(
1212 "Scheduled scan line of {} rows and {} decoders",
1213 next_scan_line.rows_scheduled,
1214 next_scan_line.decoders.len()
1215 );
1216 if !schedule_action(Ok(DecoderMessage {
1217 scheduled_so_far: num_rows_scheduled,
1218 decoders: next_scan_line.decoders,
1219 })) {
1220 return;
1222 }
1223
1224 trace!("Finished scheduling {} ranges", ranges.len());
1225 }
1226 }
1227
1228 fn do_schedule_ranges(
1229 &mut self,
1230 ranges: &[Range<u64>],
1231 filter: &FilterExpression,
1232 io: Arc<dyn EncodingsIo>,
1233 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1234 priority: Option<Box<dyn PriorityRange>>,
1238 ) {
1239 match &self.root_scheduler {
1240 RootScheduler::Legacy(_) => {
1241 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1242 }
1243 RootScheduler::Structural(_) => {
1244 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1245 }
1246 }
1247 }
1248
1249 pub fn schedule_ranges_to_vec(
1252 &mut self,
1253 ranges: &[Range<u64>],
1254 filter: &FilterExpression,
1255 io: Arc<dyn EncodingsIo>,
1256 priority: Option<Box<dyn PriorityRange>>,
1257 ) -> Result<Vec<DecoderMessage>> {
1258 let mut decode_messages = Vec::new();
1259 self.do_schedule_ranges(
1260 ranges,
1261 filter,
1262 io,
1263 |msg| {
1264 decode_messages.push(msg);
1265 true
1266 },
1267 priority,
1268 );
1269 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1270 }
1271
1272 #[instrument(skip_all)]
1282 pub fn schedule_ranges(
1283 &mut self,
1284 ranges: &[Range<u64>],
1285 filter: &FilterExpression,
1286 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1287 scheduler: Arc<dyn EncodingsIo>,
1288 ) {
1289 self.do_schedule_ranges(
1290 ranges,
1291 filter,
1292 scheduler,
1293 |msg| {
1294 match sink.send(msg) {
1295 Ok(_) => true,
1296 Err(SendError { .. }) => {
1297 debug!(
1300 "schedule_ranges aborting early since decoder appears to have been dropped"
1301 );
1302 false
1303 }
1304 }
1305 },
1306 None,
1307 )
1308 }
1309
1310 #[instrument(skip_all)]
1318 pub fn schedule_range(
1319 &mut self,
1320 range: Range<u64>,
1321 filter: &FilterExpression,
1322 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1323 scheduler: Arc<dyn EncodingsIo>,
1324 ) {
1325 self.schedule_ranges(&[range], filter, sink, scheduler)
1326 }
1327
1328 pub fn schedule_take(
1336 &mut self,
1337 indices: &[u64],
1338 filter: &FilterExpression,
1339 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1340 scheduler: Arc<dyn EncodingsIo>,
1341 ) {
1342 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1343 if indices.is_empty() {
1344 return;
1345 }
1346 trace!("Scheduling take of {} rows", indices.len());
1347 let ranges = indices
1348 .iter()
1349 .map(|&idx| idx..(idx + 1))
1350 .collect::<Vec<_>>();
1351 self.schedule_ranges(&ranges, filter, sink, scheduler)
1352 }
1353}
1354
1355pub struct ReadBatchTask {
1356 pub task: BoxFuture<'static, Result<RecordBatch>>,
1357 pub num_rows: u32,
1358}
1359
1360pub struct BatchDecodeStream {
1362 context: DecoderContext,
1363 root_decoder: SimpleStructDecoder,
1364 rows_remaining: u64,
1365 rows_per_batch: u32,
1366 rows_scheduled: u64,
1367 rows_drained: u64,
1368 scheduler_exhausted: bool,
1369 emitted_batch_size_warning: Arc<Once>,
1370}
1371
1372impl BatchDecodeStream {
1373 pub fn new(
1384 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1385 rows_per_batch: u32,
1386 num_rows: u64,
1387 root_decoder: SimpleStructDecoder,
1388 ) -> Self {
1389 Self {
1390 context: DecoderContext::new(scheduled),
1391 root_decoder,
1392 rows_remaining: num_rows,
1393 rows_per_batch,
1394 rows_scheduled: 0,
1395 rows_drained: 0,
1396 scheduler_exhausted: false,
1397 emitted_batch_size_warning: Arc::new(Once::new()),
1398 }
1399 }
1400
1401 fn accept_decoder(&mut self, decoder: DecoderReady) -> Result<()> {
1402 if decoder.path.is_empty() {
1403 Ok(())
1405 } else {
1406 self.root_decoder.accept_child(decoder)
1407 }
1408 }
1409
1410 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1411 if self.scheduler_exhausted {
1412 return Ok(self.rows_scheduled);
1413 }
1414 while self.rows_scheduled < scheduled_need {
1415 let next_message = self.context.source.recv().await;
1416 match next_message {
1417 Some(scan_line) => {
1418 let scan_line = scan_line?;
1419 self.rows_scheduled = scan_line.scheduled_so_far;
1420 for message in scan_line.decoders {
1421 self.accept_decoder(message.into_legacy())?;
1422 }
1423 }
1424 None => {
1425 self.scheduler_exhausted = true;
1429 return Ok(self.rows_scheduled);
1430 }
1431 }
1432 }
1433 Ok(scheduled_need)
1434 }
1435
1436 #[instrument(level = "debug", skip_all)]
1437 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1438 trace!(
1439 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1440 self.rows_remaining,
1441 self.rows_drained,
1442 self.rows_scheduled,
1443 );
1444 if self.rows_remaining == 0 {
1445 return Ok(None);
1446 }
1447
1448 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1449 self.rows_remaining -= to_take;
1450
1451 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1452 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1453 if scheduled_need > 0 {
1454 let desired_scheduled = scheduled_need + self.rows_scheduled;
1455 trace!(
1456 "Draining from scheduler (desire at least {} scheduled rows)",
1457 desired_scheduled
1458 );
1459 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1460 if actually_scheduled < desired_scheduled {
1461 let under_scheduled = desired_scheduled - actually_scheduled;
1462 to_take -= under_scheduled;
1463 }
1464 }
1465
1466 if to_take == 0 {
1467 return Ok(None);
1468 }
1469
1470 let loaded_need = self.rows_drained + to_take - 1;
1472 trace!(
1473 "Waiting for I/O (desire at least {} fully loaded rows)",
1474 loaded_need
1475 );
1476 self.root_decoder.wait_for_loaded(loaded_need).await?;
1477
1478 let next_task = self.root_decoder.drain(to_take)?;
1479 self.rows_drained += to_take;
1480 Ok(Some(next_task))
1481 }
1482
1483 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1484 let stream = futures::stream::unfold(self, |mut slf| async move {
1485 let next_task = slf.next_batch_task().await;
1486 let next_task = next_task.transpose().map(|next_task| {
1487 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1488 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1489 let task = tokio::spawn(async move {
1490 let next_task = next_task?;
1491 next_task.into_batch(emitted_batch_size_warning)
1492 });
1493 (task, num_rows)
1494 });
1495 next_task.map(|(task, num_rows)| {
1496 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1497 debug_assert!(num_rows <= u32::MAX as u64);
1499 let next_task = ReadBatchTask {
1500 task,
1501 num_rows: num_rows as u32,
1502 };
1503 (next_task, slf)
1504 })
1505 });
1506 stream.boxed()
1507 }
1508}
1509
1510enum RootDecoderMessage {
1513 LoadedPage(LoadedPage),
1514 LegacyPage(DecoderReady),
1515}
1516trait RootDecoderType {
1517 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1518 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1519 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1520}
1521impl RootDecoderType for StructuralStructDecoder {
1522 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1523 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1524 unreachable!()
1525 };
1526 self.accept_page(loaded_page)
1527 }
1528 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1529 self.drain_batch_task(num_rows)
1530 }
1531 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1532 Ok(())
1534 }
1535}
1536impl RootDecoderType for SimpleStructDecoder {
1537 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1538 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1539 unreachable!()
1540 };
1541 self.accept_child(legacy_page)
1542 }
1543 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1544 self.drain(num_rows)
1545 }
1546 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1547 runtime.block_on(self.wait_for_loaded(loaded_need))
1548 }
1549}
1550
1551struct BatchDecodeIterator<T: RootDecoderType> {
1553 messages: VecDeque<Result<DecoderMessage>>,
1554 root_decoder: T,
1555 rows_remaining: u64,
1556 rows_per_batch: u32,
1557 rows_scheduled: u64,
1558 rows_drained: u64,
1559 emitted_batch_size_warning: Arc<Once>,
1560 wait_for_io_runtime: tokio::runtime::Runtime,
1564 schema: Arc<ArrowSchema>,
1565}
1566
1567impl<T: RootDecoderType> BatchDecodeIterator<T> {
1568 pub fn new(
1570 messages: VecDeque<Result<DecoderMessage>>,
1571 rows_per_batch: u32,
1572 num_rows: u64,
1573 root_decoder: T,
1574 schema: Arc<ArrowSchema>,
1575 ) -> Self {
1576 Self {
1577 messages,
1578 root_decoder,
1579 rows_remaining: num_rows,
1580 rows_per_batch,
1581 rows_scheduled: 0,
1582 rows_drained: 0,
1583 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1584 .build()
1585 .unwrap(),
1586 emitted_batch_size_warning: Arc::new(Once::new()),
1587 schema,
1588 }
1589 }
1590
1591 fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1596 match maybe_done(unloaded_page.0) {
1597 MaybeDone::Done(loaded_page) => loaded_page,
1599 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1601 MaybeDone::Gone => unreachable!(),
1602 }
1603 }
1604
1605 #[instrument(skip_all)]
1610 fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1611 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1612 let message = self.messages.pop_front().unwrap()?;
1613 self.rows_scheduled = message.scheduled_so_far;
1614 for decoder_message in message.decoders {
1615 match decoder_message {
1616 MessageType::UnloadedPage(unloaded_page) => {
1617 let loaded_page = self.wait_for_page(unloaded_page)?;
1618 self.root_decoder
1619 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1620 }
1621 MessageType::DecoderReady(decoder_ready) => {
1622 if !decoder_ready.path.is_empty() {
1624 self.root_decoder
1625 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1626 }
1627 }
1628 }
1629 }
1630 }
1631
1632 let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1633
1634 self.root_decoder
1635 .wait(loaded_need, &self.wait_for_io_runtime)?;
1636 Ok(self.rows_scheduled)
1637 }
1638
1639 #[instrument(level = "debug", skip_all)]
1640 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1641 trace!(
1642 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1643 self.rows_remaining,
1644 self.rows_drained,
1645 self.rows_scheduled,
1646 );
1647 if self.rows_remaining == 0 {
1648 return Ok(None);
1649 }
1650
1651 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1652 self.rows_remaining -= to_take;
1653
1654 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1655 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1656 if scheduled_need > 0 {
1657 let desired_scheduled = scheduled_need + self.rows_scheduled;
1658 trace!(
1659 "Draining from scheduler (desire at least {} scheduled rows)",
1660 desired_scheduled
1661 );
1662 let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1663 if actually_scheduled < desired_scheduled {
1664 let under_scheduled = desired_scheduled - actually_scheduled;
1665 to_take -= under_scheduled;
1666 }
1667 }
1668
1669 if to_take == 0 {
1670 return Ok(None);
1671 }
1672
1673 let next_task = self.root_decoder.drain_batch(to_take)?;
1674
1675 self.rows_drained += to_take;
1676
1677 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1678
1679 Ok(Some(batch))
1680 }
1681}
1682
1683impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1684 type Item = ArrowResult<RecordBatch>;
1685
1686 fn next(&mut self) -> Option<Self::Item> {
1687 self.next_batch_task()
1688 .transpose()
1689 .map(|r| r.map_err(ArrowError::from))
1690 }
1691}
1692
1693impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1694 fn schema(&self) -> Arc<ArrowSchema> {
1695 self.schema.clone()
1696 }
1697}
1698
1699pub struct StructuralBatchDecodeStream {
1701 context: DecoderContext,
1702 root_decoder: StructuralStructDecoder,
1703 rows_remaining: u64,
1704 rows_per_batch: u32,
1705 rows_scheduled: u64,
1706 rows_drained: u64,
1707 scheduler_exhausted: bool,
1708 emitted_batch_size_warning: Arc<Once>,
1709}
1710
1711impl StructuralBatchDecodeStream {
1712 pub fn new(
1723 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1724 rows_per_batch: u32,
1725 num_rows: u64,
1726 root_decoder: StructuralStructDecoder,
1727 ) -> Self {
1728 Self {
1729 context: DecoderContext::new(scheduled),
1730 root_decoder,
1731 rows_remaining: num_rows,
1732 rows_per_batch,
1733 rows_scheduled: 0,
1734 rows_drained: 0,
1735 scheduler_exhausted: false,
1736 emitted_batch_size_warning: Arc::new(Once::new()),
1737 }
1738 }
1739
1740 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1741 if self.scheduler_exhausted {
1742 return Ok(self.rows_scheduled);
1743 }
1744 while self.rows_scheduled < scheduled_need {
1745 let next_message = self.context.source.recv().await;
1746 match next_message {
1747 Some(scan_line) => {
1748 let scan_line = scan_line?;
1749 self.rows_scheduled = scan_line.scheduled_so_far;
1750 for message in scan_line.decoders {
1751 let unloaded_page = message.into_structural();
1752 let loaded_page = unloaded_page.0.await?;
1753 self.root_decoder.accept_page(loaded_page)?;
1754 }
1755 }
1756 None => {
1757 self.scheduler_exhausted = true;
1761 return Ok(self.rows_scheduled);
1762 }
1763 }
1764 }
1765 Ok(scheduled_need)
1766 }
1767
1768 #[instrument(level = "debug", skip_all)]
1769 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1770 trace!(
1771 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1772 self.rows_remaining,
1773 self.rows_drained,
1774 self.rows_scheduled,
1775 );
1776 if self.rows_remaining == 0 {
1777 return Ok(None);
1778 }
1779
1780 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1781 self.rows_remaining -= to_take;
1782
1783 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1784 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1785 if scheduled_need > 0 {
1786 let desired_scheduled = scheduled_need + self.rows_scheduled;
1787 trace!(
1788 "Draining from scheduler (desire at least {} scheduled rows)",
1789 desired_scheduled
1790 );
1791 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1792 if actually_scheduled < desired_scheduled {
1793 let under_scheduled = desired_scheduled - actually_scheduled;
1794 to_take -= under_scheduled;
1795 }
1796 }
1797
1798 if to_take == 0 {
1799 return Ok(None);
1800 }
1801
1802 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1803 self.rows_drained += to_take;
1804 Ok(Some(next_task))
1805 }
1806
1807 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1808 let stream = futures::stream::unfold(self, |mut slf| async move {
1809 let next_task = slf.next_batch_task().await;
1810 let next_task = next_task.transpose().map(|next_task| {
1811 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1812 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1813 let task = tokio::spawn(async move {
1814 let next_task = next_task?;
1815 next_task.into_batch(emitted_batch_size_warning)
1816 });
1817 (task, num_rows)
1818 });
1819 next_task.map(|(task, num_rows)| {
1820 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1821 debug_assert!(num_rows <= u32::MAX as u64);
1823 let next_task = ReadBatchTask {
1824 task,
1825 num_rows: num_rows as u32,
1826 };
1827 (next_task, slf)
1828 })
1829 });
1830 stream.boxed()
1831 }
1832}
1833
1834#[derive(Debug)]
1835pub enum RequestedRows {
1836 Ranges(Vec<Range<u64>>),
1837 Indices(Vec<u64>),
1838}
1839
1840impl RequestedRows {
1841 pub fn num_rows(&self) -> u64 {
1842 match self {
1843 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1844 Self::Indices(indices) => indices.len() as u64,
1845 }
1846 }
1847}
1848
1849#[derive(Debug, Clone)]
1850pub struct SchedulerDecoderConfig {
1851 pub decoder_plugins: Arc<DecoderPlugins>,
1852 pub batch_size: u32,
1853 pub io: Arc<dyn EncodingsIo>,
1854 pub cache: Arc<FileMetadataCache>,
1855 pub should_validate: bool,
1856}
1857
1858fn check_scheduler_on_drop(
1859 stream: BoxStream<'static, ReadBatchTask>,
1860 scheduler_handle: tokio::task::JoinHandle<()>,
1861) -> BoxStream<'static, ReadBatchTask> {
1862 let mut scheduler_handle = Some(scheduler_handle);
1866 let check_scheduler = stream::unfold((), move |_| {
1867 let handle = scheduler_handle.take();
1868 async move {
1869 if let Some(handle) = handle {
1870 handle.await.unwrap();
1871 }
1872 None
1873 }
1874 });
1875 stream.chain(check_scheduler).boxed()
1876}
1877
1878pub fn create_decode_stream(
1879 schema: &Schema,
1880 num_rows: u64,
1881 batch_size: u32,
1882 is_structural: bool,
1883 should_validate: bool,
1884 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1885) -> BoxStream<'static, ReadBatchTask> {
1886 if is_structural {
1887 let arrow_schema = ArrowSchema::from(schema);
1888 let structural_decoder = StructuralStructDecoder::new(
1889 arrow_schema.fields,
1890 should_validate,
1891 true,
1892 );
1893 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1894 } else {
1895 let arrow_schema = ArrowSchema::from(schema);
1896 let root_fields = arrow_schema.fields;
1897
1898 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1899 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1900 }
1901}
1902
1903pub fn create_decode_iterator(
1907 schema: &Schema,
1908 num_rows: u64,
1909 batch_size: u32,
1910 should_validate: bool,
1911 is_structural: bool,
1912 messages: VecDeque<Result<DecoderMessage>>,
1913) -> Box<dyn RecordBatchReader> {
1914 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1915 let root_fields = arrow_schema.fields.clone();
1916 if is_structural {
1917 let simple_struct_decoder =
1918 StructuralStructDecoder::new(root_fields, should_validate, true);
1919 Box::new(BatchDecodeIterator::new(
1920 messages,
1921 batch_size,
1922 num_rows,
1923 simple_struct_decoder,
1924 arrow_schema,
1925 ))
1926 } else {
1927 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1928 Box::new(BatchDecodeIterator::new(
1929 messages,
1930 batch_size,
1931 num_rows,
1932 root_decoder,
1933 arrow_schema,
1934 ))
1935 }
1936}
1937
1938fn create_scheduler_decoder(
1939 column_infos: Vec<Arc<ColumnInfo>>,
1940 requested_rows: RequestedRows,
1941 filter: FilterExpression,
1942 column_indices: Vec<u32>,
1943 target_schema: Arc<Schema>,
1944 config: SchedulerDecoderConfig,
1945) -> Result<BoxStream<'static, ReadBatchTask>> {
1946 let num_rows = requested_rows.num_rows();
1947
1948 let is_structural = column_infos[0].is_structural();
1949
1950 let (tx, rx) = mpsc::unbounded_channel();
1951
1952 let decode_stream = create_decode_stream(
1953 &target_schema,
1954 num_rows,
1955 config.batch_size,
1956 is_structural,
1957 config.should_validate,
1958 rx,
1959 );
1960
1961 let scheduler_handle = tokio::task::spawn(async move {
1962 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1963 target_schema.as_ref(),
1964 &column_indices,
1965 &column_infos,
1966 &vec![],
1967 num_rows,
1968 config.decoder_plugins,
1969 config.io.clone(),
1970 config.cache,
1971 &filter,
1972 )
1973 .await
1974 {
1975 Ok(scheduler) => scheduler,
1976 Err(e) => {
1977 let _ = tx.send(Err(e));
1978 return;
1979 }
1980 };
1981
1982 match requested_rows {
1983 RequestedRows::Ranges(ranges) => {
1984 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1985 }
1986 RequestedRows::Indices(indices) => {
1987 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1988 }
1989 }
1990 });
1991
1992 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1993}
1994
1995pub fn schedule_and_decode(
2001 column_infos: Vec<Arc<ColumnInfo>>,
2002 requested_rows: RequestedRows,
2003 filter: FilterExpression,
2004 column_indices: Vec<u32>,
2005 target_schema: Arc<Schema>,
2006 config: SchedulerDecoderConfig,
2007) -> BoxStream<'static, ReadBatchTask> {
2008 if requested_rows.num_rows() == 0 {
2009 return stream::empty().boxed();
2010 }
2011 match create_scheduler_decoder(
2015 column_infos,
2016 requested_rows,
2017 filter,
2018 column_indices,
2019 target_schema,
2020 config,
2021 ) {
2022 Ok(stream) => stream,
2024 Err(e) => stream::once(std::future::ready(ReadBatchTask {
2025 num_rows: 0,
2026 task: std::future::ready(Err(e)).boxed(),
2027 }))
2028 .boxed(),
2029 }
2030}
2031
2032lazy_static::lazy_static! {
2033 pub static ref WAITER_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
2034 .build()
2035 .unwrap();
2036}
2037
2038pub fn schedule_and_decode_blocking(
2053 column_infos: Vec<Arc<ColumnInfo>>,
2054 requested_rows: RequestedRows,
2055 filter: FilterExpression,
2056 column_indices: Vec<u32>,
2057 target_schema: Arc<Schema>,
2058 config: SchedulerDecoderConfig,
2059) -> Result<Box<dyn RecordBatchReader>> {
2060 if requested_rows.num_rows() == 0 {
2061 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2062 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2063 }
2064
2065 let num_rows = requested_rows.num_rows();
2066 let is_structural = column_infos[0].is_structural();
2067
2068 let (tx, mut rx) = mpsc::unbounded_channel();
2069
2070 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2073 target_schema.as_ref(),
2074 &column_indices,
2075 &column_infos,
2076 &vec![],
2077 num_rows,
2078 config.decoder_plugins,
2079 config.io.clone(),
2080 config.cache,
2081 &filter,
2082 ))?;
2083
2084 match requested_rows {
2086 RequestedRows::Ranges(ranges) => {
2087 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2088 }
2089 RequestedRows::Indices(indices) => {
2090 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2091 }
2092 }
2093
2094 let mut messages = Vec::new();
2096 while rx
2097 .recv_many(&mut messages, usize::MAX)
2098 .now_or_never()
2099 .unwrap()
2100 != 0
2101 {}
2102
2103 let decode_iterator = create_decode_iterator(
2105 &target_schema,
2106 num_rows,
2107 config.batch_size,
2108 config.should_validate,
2109 is_structural,
2110 messages.into(),
2111 );
2112
2113 Ok(decode_iterator)
2114}
2115
2116pub trait PrimitivePageDecoder: Send + Sync {
2128 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2160}
2161
2162pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2171 fn schedule_ranges(
2183 &self,
2184 ranges: &[Range<u64>],
2185 scheduler: &Arc<dyn EncodingsIo>,
2186 top_level_row: u64,
2187 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2188}
2189
2190pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2192 fn advance(&mut self, num_rows: u64);
2193 fn current_priority(&self) -> u64;
2194 fn box_clone(&self) -> Box<dyn PriorityRange>;
2195}
2196
2197#[derive(Debug)]
2200pub struct SimplePriorityRange {
2201 priority: u64,
2202}
2203
2204impl SimplePriorityRange {
2205 fn new(priority: u64) -> Self {
2206 Self { priority }
2207 }
2208}
2209
2210impl PriorityRange for SimplePriorityRange {
2211 fn advance(&mut self, num_rows: u64) {
2212 self.priority += num_rows;
2213 }
2214
2215 fn current_priority(&self) -> u64 {
2216 self.priority
2217 }
2218
2219 fn box_clone(&self) -> Box<dyn PriorityRange> {
2220 Box::new(Self {
2221 priority: self.priority,
2222 })
2223 }
2224}
2225
2226pub struct ListPriorityRange {
2239 base: Box<dyn PriorityRange>,
2240 offsets: Arc<[u64]>,
2241 cur_index_into_offsets: usize,
2242 cur_position: u64,
2243}
2244
2245impl ListPriorityRange {
2246 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2247 Self {
2248 base,
2249 offsets,
2250 cur_index_into_offsets: 0,
2251 cur_position: 0,
2252 }
2253 }
2254}
2255
2256impl std::fmt::Debug for ListPriorityRange {
2257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2258 f.debug_struct("ListPriorityRange")
2259 .field("base", &self.base)
2260 .field("offsets.len()", &self.offsets.len())
2261 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2262 .field("cur_position", &self.cur_position)
2263 .finish()
2264 }
2265}
2266
2267impl PriorityRange for ListPriorityRange {
2268 fn advance(&mut self, num_rows: u64) {
2269 self.cur_position += num_rows;
2272 let mut idx_into_offsets = self.cur_index_into_offsets;
2273 while idx_into_offsets + 1 < self.offsets.len()
2274 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2275 {
2276 idx_into_offsets += 1;
2277 }
2278 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2279 self.cur_index_into_offsets = idx_into_offsets;
2280 self.base.advance(base_rows_advanced as u64);
2281 }
2282
2283 fn current_priority(&self) -> u64 {
2284 self.base.current_priority()
2285 }
2286
2287 fn box_clone(&self) -> Box<dyn PriorityRange> {
2288 Box::new(Self {
2289 base: self.base.box_clone(),
2290 offsets: self.offsets.clone(),
2291 cur_index_into_offsets: self.cur_index_into_offsets,
2292 cur_position: self.cur_position,
2293 })
2294 }
2295}
2296
2297pub struct SchedulerContext {
2299 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2300 io: Arc<dyn EncodingsIo>,
2301 cache: Arc<FileMetadataCache>,
2302 name: String,
2303 path: Vec<u32>,
2304 path_names: Vec<String>,
2305}
2306
2307pub struct ScopedSchedulerContext<'a> {
2308 pub context: &'a mut SchedulerContext,
2309}
2310
2311impl<'a> ScopedSchedulerContext<'a> {
2312 pub fn pop(self) -> &'a mut SchedulerContext {
2313 self.context.pop();
2314 self.context
2315 }
2316}
2317
2318impl SchedulerContext {
2319 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<FileMetadataCache>) -> Self {
2320 Self {
2321 io,
2322 cache,
2323 recv: None,
2324 name: "".to_string(),
2325 path: Vec::new(),
2326 path_names: Vec::new(),
2327 }
2328 }
2329
2330 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2331 &self.io
2332 }
2333
2334 pub fn cache(&self) -> &Arc<FileMetadataCache> {
2335 &self.cache
2336 }
2337
2338 pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2339 self.path.push(index);
2340 self.path_names.push(name.to_string());
2341 ScopedSchedulerContext { context: self }
2342 }
2343
2344 pub fn pop(&mut self) {
2345 self.path.pop();
2346 self.path_names.pop();
2347 }
2348
2349 pub fn path_name(&self) -> String {
2350 let path = self.path_names.join("/");
2351 if self.recv.is_some() {
2352 format!("TEMP({}){}", self.name, path)
2353 } else {
2354 format!("ROOT{}", path)
2355 }
2356 }
2357
2358 pub fn current_path(&self) -> VecDeque<u32> {
2359 VecDeque::from_iter(self.path.iter().copied())
2360 }
2361
2362 pub fn locate_decoder(&mut self, decoder: Box<dyn LogicalPageDecoder>) -> DecoderReady {
2363 trace!(
2364 "Scheduling decoder of type {:?} for {:?}",
2365 decoder.data_type(),
2366 self.path,
2367 );
2368 DecoderReady {
2369 decoder,
2370 path: self.current_path(),
2371 }
2372 }
2373}
2374
2375pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2376
2377impl std::fmt::Debug for UnloadedPage {
2378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2379 f.debug_struct("UnloadedPage").finish()
2380 }
2381}
2382
2383#[derive(Debug)]
2384pub struct ScheduledScanLine {
2385 pub rows_scheduled: u64,
2386 pub decoders: Vec<MessageType>,
2387}
2388
2389pub trait SchedulingJob: std::fmt::Debug {
2390 fn schedule_next(
2391 &mut self,
2392 context: &mut SchedulerContext,
2393 priority: &dyn PriorityRange,
2394 ) -> Result<ScheduledScanLine>;
2395
2396 fn num_rows(&self) -> u64;
2397}
2398
2399pub trait StructuralSchedulingJob: std::fmt::Debug {
2400 fn schedule_next(
2401 &mut self,
2402 context: &mut SchedulerContext,
2403 ) -> Result<Option<ScheduledScanLine>>;
2404}
2405
2406pub struct FilterExpression(pub Bytes);
2414
2415impl FilterExpression {
2416 pub fn no_filter() -> Self {
2421 Self(Bytes::new())
2422 }
2423
2424 pub fn is_noop(&self) -> bool {
2426 self.0.is_empty()
2427 }
2428}
2429
2430pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
2455 fn initialize<'a>(
2457 &'a self,
2458 filter: &'a FilterExpression,
2459 context: &'a SchedulerContext,
2460 ) -> BoxFuture<'a, Result<()>>;
2461 fn schedule_ranges<'a>(
2466 &'a self,
2467 ranges: &[Range<u64>],
2468 filter: &FilterExpression,
2469 ) -> Result<Box<dyn SchedulingJob + 'a>>;
2470 fn num_rows(&self) -> u64;
2472}
2473
2474pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2475 fn initialize<'a>(
2476 &'a mut self,
2477 filter: &'a FilterExpression,
2478 context: &'a SchedulerContext,
2479 ) -> BoxFuture<'a, Result<()>>;
2480 fn schedule_ranges<'a>(
2481 &'a self,
2482 ranges: &[Range<u64>],
2483 filter: &FilterExpression,
2484 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2485}
2486
2487pub trait DecodeArrayTask: Send {
2489 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2491}
2492
2493impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2494 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2495 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2496 }
2497}
2498
2499pub struct NextDecodeTask {
2504 pub task: Box<dyn DecodeArrayTask>,
2506 pub num_rows: u64,
2508}
2509
2510impl NextDecodeTask {
2511 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2516 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2517 let struct_arr = self.task.decode();
2518 match struct_arr {
2519 Ok(struct_arr) => {
2520 let batch = RecordBatch::from(struct_arr.as_struct());
2521 let size_bytes = batch.get_array_memory_size() as u64;
2522 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2523 emitted_batch_size_warning.call_once(|| {
2524 let size_mb = size_bytes / 1024 / 1024;
2525 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2526 });
2527 }
2528 Ok(batch)
2529 }
2530 Err(e) => {
2531 let e = Error::Internal {
2532 message: format!("Error decoding batch: {}", e),
2533 location: location!(),
2534 };
2535 Err(e)
2536 }
2537 }
2538 }
2539}
2540
2541#[derive(Debug)]
2542pub struct DecoderReady {
2543 pub decoder: Box<dyn LogicalPageDecoder>,
2545 pub path: VecDeque<u32>,
2564}
2565
2566#[derive(Debug)]
2570pub enum MessageType {
2571 DecoderReady(DecoderReady),
2576 UnloadedPage(UnloadedPage),
2580}
2581
2582impl MessageType {
2583 pub fn into_legacy(self) -> DecoderReady {
2584 match self {
2585 Self::DecoderReady(decoder) => decoder,
2586 Self::UnloadedPage(_) => {
2587 panic!("Expected DecoderReady but got UnloadedPage")
2588 }
2589 }
2590 }
2591
2592 pub fn into_structural(self) -> UnloadedPage {
2593 match self {
2594 Self::UnloadedPage(unloaded) => unloaded,
2595 Self::DecoderReady(_) => {
2596 panic!("Expected UnloadedPage but got DecoderReady")
2597 }
2598 }
2599 }
2600}
2601
2602pub struct DecoderMessage {
2603 pub scheduled_so_far: u64,
2604 pub decoders: Vec<MessageType>,
2605}
2606
2607pub struct DecoderContext {
2608 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2609}
2610
2611impl DecoderContext {
2612 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2613 Self { source }
2614 }
2615}
2616
2617pub trait LogicalPageDecoder: std::fmt::Debug + Send {
2626 fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
2631 Err(Error::Internal {
2632 message: format!(
2633 "The decoder {:?} does not expect children but received a child",
2634 self
2635 ),
2636 location: location!(),
2637 })
2638 }
2639 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>>;
2641 fn rows_loaded(&self) -> u64;
2643 fn rows_unloaded(&self) -> u64 {
2645 self.num_rows() - self.rows_loaded()
2646 }
2647 fn num_rows(&self) -> u64;
2649 fn rows_drained(&self) -> u64;
2651 fn rows_left(&self) -> u64 {
2653 self.num_rows() - self.rows_drained()
2654 }
2655 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
2657 fn data_type(&self) -> &DataType;
2659}
2660
2661pub struct DecodedPage {
2662 pub data: DataBlock,
2663 pub repdef: RepDefUnraveler,
2664}
2665
2666pub trait DecodePageTask: Send + std::fmt::Debug {
2667 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2669}
2670
2671pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2672 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2673 fn num_rows(&self) -> u64;
2674}
2675
2676#[derive(Debug)]
2677pub struct LoadedPage {
2678 pub decoder: Box<dyn StructuralPageDecoder>,
2680 pub path: VecDeque<u32>,
2699 pub page_index: usize,
2700}
2701
2702pub struct DecodedArray {
2703 pub array: ArrayRef,
2704 pub repdef: CompositeRepDefUnraveler,
2705}
2706
2707pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2708 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2709}
2710
2711pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2712 fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2717 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2719 fn data_type(&self) -> &DataType;
2721}
2722
2723#[derive(Debug, Default)]
2724pub struct DecoderPlugins {}
2725
2726pub async fn decode_batch(
2728 batch: &EncodedBatch,
2729 filter: &FilterExpression,
2730 decoder_plugins: Arc<DecoderPlugins>,
2731 should_validate: bool,
2732 version: LanceFileVersion,
2733 cache: Option<Arc<FileMetadataCache>>,
2734) -> Result<RecordBatch> {
2735 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2740 let cache = cache.unwrap_or_else(|| {
2741 Arc::new(FileMetadataCache::with_capacity(
2742 128 * 1024 * 1024,
2743 CapacityMode::Bytes,
2744 ))
2745 });
2746 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2747 batch.schema.as_ref(),
2748 &batch.top_level_columns,
2749 &batch.page_table,
2750 &vec![],
2751 batch.num_rows,
2752 decoder_plugins,
2753 io_scheduler.clone(),
2754 cache,
2755 filter,
2756 )
2757 .await?;
2758 let (tx, rx) = unbounded_channel();
2759 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2760 let is_structural = version >= LanceFileVersion::V2_1;
2761 let mut decode_stream = create_decode_stream(
2762 &batch.schema,
2763 batch.num_rows,
2764 batch.num_rows as u32,
2765 is_structural,
2766 should_validate,
2767 rx,
2768 );
2769 decode_stream.next().await.unwrap().task.await
2770}