1use std::{
5 collections::{BTreeMap, BTreeSet},
6 io::Cursor,
7 ops::Range,
8 pin::Pin,
9 sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19 decoder::{
20 schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderPlugins,
21 FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22 SchedulerDecoderConfig,
23 },
24 encoder::EncodedBatch,
25 version::LanceFileVersion,
26 EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34 cache::FileMetadataCache,
35 datatypes::{Field, Schema},
36 Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_io::{
40 scheduler::FileScheduler,
41 stream::{RecordBatchStream, RecordBatchStreamAdapter},
42 ReadBatchParams,
43};
44
45use crate::{
46 datatypes::{Fields, FieldsWithMeta},
47 format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
48 v2::writer::PAGE_BUFFER_ALIGNMENT,
49};
50
51use super::io::LanceEncodingsIo;
52
53#[derive(Debug, DeepSizeOf)]
58pub struct BufferDescriptor {
59 pub position: u64,
60 pub size: u64,
61}
62
63#[derive(Debug)]
65pub struct FileStatistics {
66 pub columns: Vec<ColumnStatistics>,
68}
69
70#[derive(Debug)]
72pub struct ColumnStatistics {
73 pub num_pages: usize,
75 pub size_bytes: u64,
79}
80
81#[derive(Debug)]
83pub struct CachedFileMetadata {
84 pub file_schema: Arc<Schema>,
86 pub column_metadatas: Vec<pbfile::ColumnMetadata>,
88 pub column_infos: Vec<Arc<ColumnInfo>>,
89 pub num_rows: u64,
91 pub file_buffers: Vec<BufferDescriptor>,
92 pub num_data_bytes: u64,
94 pub num_column_metadata_bytes: u64,
97 pub num_global_buffer_bytes: u64,
99 pub num_footer_bytes: u64,
101 pub major_version: u16,
102 pub minor_version: u16,
103}
104
105impl DeepSizeOf for CachedFileMetadata {
106 fn deep_size_of_children(&self, context: &mut Context) -> usize {
108 self.file_schema.deep_size_of_children(context)
109 + self
110 .file_buffers
111 .iter()
112 .map(|file_buffer| file_buffer.deep_size_of_children(context))
113 .sum::<usize>()
114 }
115}
116
117impl CachedFileMetadata {
118 pub fn version(&self) -> LanceFileVersion {
119 match (self.major_version, self.minor_version) {
120 (0, 3) => LanceFileVersion::V2_0,
121 (2, 1) => LanceFileVersion::V2_1,
122 _ => panic!(
123 "Unsupported version: {}.{}",
124 self.major_version, self.minor_version
125 ),
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
152pub struct ReaderProjection {
153 pub schema: Arc<Schema>,
156 pub column_indices: Vec<u32>,
187}
188
189impl ReaderProjection {
190 fn from_field_ids_helper<'a>(
191 reader: &FileReader,
192 fields: impl Iterator<Item = &'a Field>,
193 field_id_to_column_index: &BTreeMap<u32, u32>,
194 column_indices: &mut Vec<u32>,
195 ) -> Result<()> {
196 for field in fields {
197 let is_structural = reader.metadata.version() >= LanceFileVersion::V2_1;
198 if !is_structural || field.children.is_empty() {
201 if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
202 {
203 column_indices.push(column_idx);
204 }
205 }
206 Self::from_field_ids_helper(
207 reader,
208 field.children.iter(),
209 field_id_to_column_index,
210 column_indices,
211 )?;
212 }
213 Ok(())
214 }
215
216 pub fn from_field_ids(
221 reader: &FileReader,
222 schema: &Schema,
223 field_id_to_column_index: &BTreeMap<u32, u32>,
224 ) -> Result<Self> {
225 let mut column_indices = Vec::new();
226 Self::from_field_ids_helper(
227 reader,
228 schema.fields.iter(),
229 field_id_to_column_index,
230 &mut column_indices,
231 )?;
232 Ok(Self {
233 schema: Arc::new(schema.clone()),
234 column_indices,
235 })
236 }
237
238 pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
246 let schema = Arc::new(schema.clone());
247 let is_structural = version >= LanceFileVersion::V2_1;
248 let mut column_indices = vec![];
249 let mut curr_column_idx = 0;
250 let mut packed_struct_fields_num = 0;
251 for field in schema.fields_pre_order() {
252 if packed_struct_fields_num > 0 {
253 packed_struct_fields_num -= 1;
254 continue;
255 }
256 if field.is_packed_struct() {
257 column_indices.push(curr_column_idx);
258 curr_column_idx += 1;
259 packed_struct_fields_num = field.children.len();
260 } else if field.children.is_empty() || !is_structural {
261 column_indices.push(curr_column_idx);
262 curr_column_idx += 1;
263 }
264 }
265 Self {
266 schema,
267 column_indices,
268 }
269 }
270
271 pub fn from_column_names(schema: &Schema, column_names: &[&str]) -> Result<Self> {
278 let field_id_to_column_index = schema
279 .fields_pre_order()
280 .enumerate()
281 .map(|(idx, field)| (field.id as u32, idx as u32))
282 .collect::<BTreeMap<_, _>>();
283 let projected = schema.project(column_names)?;
284 let column_indices = projected
285 .fields_pre_order()
286 .map(|f| field_id_to_column_index[&(f.id as u32)])
287 .collect::<Vec<_>>();
288 Ok(Self {
289 schema: Arc::new(projected),
290 column_indices,
291 })
292 }
293}
294
295#[derive(Debug, Default)]
296pub struct FileReaderOptions {
297 validate_on_decode: bool,
298}
299
300#[derive(Debug)]
301pub struct FileReader {
302 scheduler: Arc<dyn EncodingsIo>,
303 base_projection: ReaderProjection,
305 num_rows: u64,
306 metadata: Arc<CachedFileMetadata>,
307 decoder_plugins: Arc<DecoderPlugins>,
308 cache: Arc<FileMetadataCache>,
309 options: FileReaderOptions,
310}
311#[derive(Debug)]
312struct Footer {
313 #[allow(dead_code)]
314 column_meta_start: u64,
315 #[allow(dead_code)]
318 column_meta_offsets_start: u64,
319 global_buff_offsets_start: u64,
320 num_global_buffers: u32,
321 num_columns: u32,
322 major_version: u16,
323 minor_version: u16,
324}
325
326const FOOTER_LEN: usize = 40;
327
328impl FileReader {
329 pub fn num_rows(&self) -> u64 {
330 self.num_rows
331 }
332
333 pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
334 &self.metadata
335 }
336
337 pub fn file_statistics(&self) -> FileStatistics {
338 let column_metadatas = &self.metadata().column_metadatas;
339
340 let column_stats = column_metadatas
341 .iter()
342 .map(|col_metadata| {
343 let num_pages = col_metadata.pages.len();
344 let size_bytes = col_metadata
345 .pages
346 .iter()
347 .map(|page| page.buffer_sizes.iter().sum::<u64>())
348 .sum::<u64>();
349 ColumnStatistics {
350 num_pages,
351 size_bytes,
352 }
353 })
354 .collect();
355
356 FileStatistics {
357 columns: column_stats,
358 }
359 }
360
361 pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
362 let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
363 self.scheduler
364 .submit_single(
365 buffer_desc.position..buffer_desc.position + buffer_desc.size,
366 0,
367 )
368 .await
369 }
370
371 async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
372 let file_size = scheduler.reader().size().await? as u64;
373 let begin = if file_size < scheduler.reader().block_size() as u64 {
374 0
375 } else {
376 file_size - scheduler.reader().block_size() as u64
377 };
378 let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
379 Ok((tail_bytes, file_size))
380 }
381
382 fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
385 let len = footer_bytes.len();
386 if len < FOOTER_LEN {
387 return Err(Error::io(
388 format!(
389 "does not have sufficient data, len: {}, bytes: {:?}",
390 len, footer_bytes
391 ),
392 location!(),
393 ));
394 }
395 let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
396
397 let column_meta_start = cursor.read_u64::<LittleEndian>()?;
398 let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
399 let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
400 let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
401 let num_columns = cursor.read_u32::<LittleEndian>()?;
402 let major_version = cursor.read_u16::<LittleEndian>()?;
403 let minor_version = cursor.read_u16::<LittleEndian>()?;
404
405 if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
406 return Err(Error::version_conflict(
407 "Attempt to use the lance v2 reader to read a legacy file".to_string(),
408 major_version,
409 minor_version,
410 location!(),
411 ));
412 }
413
414 let magic_bytes = footer_bytes.slice(len - 4..);
415 if magic_bytes.as_ref() != MAGIC {
416 return Err(Error::io(
417 format!(
418 "file does not appear to be a Lance file (invalid magic: {:?})",
419 MAGIC
420 ),
421 location!(),
422 ));
423 }
424 Ok(Footer {
425 column_meta_start,
426 column_meta_offsets_start,
427 global_buff_offsets_start,
428 num_global_buffers,
429 num_columns,
430 major_version,
431 minor_version,
432 })
433 }
434
435 fn read_all_column_metadata(
437 column_metadata_bytes: Bytes,
438 footer: &Footer,
439 ) -> Result<Vec<pbfile::ColumnMetadata>> {
440 let column_metadata_start = footer.column_meta_start;
441 let cmo_table_size = 16 * footer.num_columns as usize;
443 let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
444
445 (0..footer.num_columns)
446 .map(|col_idx| {
447 let offset = (col_idx * 16) as usize;
448 let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
449 let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
450 let normalized_position = (position - column_metadata_start) as usize;
451 let normalized_end = normalized_position + (length as usize);
452 Ok(pbfile::ColumnMetadata::decode(
453 &column_metadata_bytes[normalized_position..normalized_end],
454 )?)
455 })
456 .collect::<Result<Vec<_>>>()
457 }
458
459 async fn optimistic_tail_read(
460 data: &Bytes,
461 start_pos: u64,
462 scheduler: &FileScheduler,
463 file_len: u64,
464 ) -> Result<Bytes> {
465 let num_bytes_needed = (file_len - start_pos) as usize;
466 if data.len() >= num_bytes_needed {
467 Ok(data.slice((data.len() - num_bytes_needed)..))
468 } else {
469 let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
470 let start = file_len - num_bytes_needed as u64;
471 let missing_bytes = scheduler
472 .submit_single(start..start + num_bytes_missing, 0)
473 .await?;
474 let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
475 combined.extend(missing_bytes);
476 combined.extend(data);
477 Ok(combined.freeze())
478 }
479 }
480
481 fn do_decode_gbo_table(
482 gbo_bytes: &Bytes,
483 footer: &Footer,
484 version: LanceFileVersion,
485 ) -> Result<Vec<BufferDescriptor>> {
486 let mut global_bufs_cursor = Cursor::new(gbo_bytes);
487
488 let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
489 for _ in 0..footer.num_global_buffers {
490 let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
491 assert!(
492 version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
493 );
494 let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
495 global_buffers.push(BufferDescriptor {
496 position: buf_pos,
497 size: buf_size,
498 });
499 }
500
501 Ok(global_buffers)
502 }
503
504 async fn decode_gbo_table(
505 tail_bytes: &Bytes,
506 file_len: u64,
507 scheduler: &FileScheduler,
508 footer: &Footer,
509 version: LanceFileVersion,
510 ) -> Result<Vec<BufferDescriptor>> {
511 let gbo_bytes = Self::optimistic_tail_read(
514 tail_bytes,
515 footer.global_buff_offsets_start,
516 scheduler,
517 file_len,
518 )
519 .await?;
520 Self::do_decode_gbo_table(&gbo_bytes, footer, version)
521 }
522
523 fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
524 let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
525 let pb_schema = file_descriptor.schema.unwrap();
526 let num_rows = file_descriptor.length;
527 let fields_with_meta = FieldsWithMeta {
528 fields: Fields(pb_schema.fields),
529 metadata: pb_schema.metadata,
530 };
531 let schema = lance_core::datatypes::Schema::from(fields_with_meta);
532 Ok((num_rows, schema))
533 }
534
535 pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
549 let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
551 let footer = Self::decode_footer(&tail_bytes)?;
552
553 let file_version = LanceFileVersion::try_from_major_minor(
554 footer.major_version as u32,
555 footer.minor_version as u32,
556 )?;
557
558 let gbo_table =
559 Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
560 if gbo_table.is_empty() {
561 return Err(Error::Internal {
562 message: "File did not contain any global buffers, schema expected".to_string(),
563 location: location!(),
564 });
565 }
566 let schema_start = gbo_table[0].position;
567 let schema_size = gbo_table[0].size;
568
569 let num_footer_bytes = file_len - schema_start;
570
571 let all_metadata_bytes =
574 Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
575
576 let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
577 let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
578
579 let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
582 let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
583 let column_metadata_bytes =
584 all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
585 let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
586
587 let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
588 let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
589 let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
590
591 let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
592
593 Ok(CachedFileMetadata {
594 file_schema: Arc::new(schema),
595 column_metadatas,
596 column_infos,
597 num_rows,
598 num_data_bytes,
599 num_column_metadata_bytes,
600 num_global_buffer_bytes,
601 num_footer_bytes,
602 file_buffers: gbo_table,
603 major_version: footer.major_version,
604 minor_version: footer.minor_version,
605 })
606 }
607
608 fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
609 match &encoding.location {
610 Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
611 Some(pbfile::encoding::Location::Direct(encoding)) => {
612 let encoding_buf = Bytes::from(encoding.encoding.clone());
613 let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
614 encoding_any.to_msg::<M>().unwrap()
615 }
616 Some(pbfile::encoding::Location::None(_)) => panic!(),
617 None => panic!(),
618 }
619 }
620
621 fn meta_to_col_infos(
622 column_metadatas: &[pbfile::ColumnMetadata],
623 file_version: LanceFileVersion,
624 ) -> Vec<Arc<ColumnInfo>> {
625 column_metadatas
626 .iter()
627 .enumerate()
628 .map(|(col_idx, col_meta)| {
629 let page_infos = col_meta
630 .pages
631 .iter()
632 .map(|page| {
633 let num_rows = page.length;
634 let encoding = match file_version {
635 LanceFileVersion::V2_0 => {
636 PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
637 page.encoding.as_ref().unwrap(),
638 ))
639 }
640 _ => {
641 PageEncoding::Structural(Self::fetch_encoding::<pbenc::PageLayout>(
642 page.encoding.as_ref().unwrap(),
643 ))
644 }
645 };
646 let buffer_offsets_and_sizes = Arc::from(
647 page.buffer_offsets
648 .iter()
649 .zip(page.buffer_sizes.iter())
650 .map(|(offset, size)| {
651 assert!(
653 file_version < LanceFileVersion::V2_1
654 || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
655 );
656 (*offset, *size)
657 })
658 .collect::<Vec<_>>(),
659 );
660 PageInfo {
661 buffer_offsets_and_sizes,
662 encoding,
663 num_rows,
664 priority: page.priority,
665 }
666 })
667 .collect::<Vec<_>>();
668 let buffer_offsets_and_sizes = Arc::from(
669 col_meta
670 .buffer_offsets
671 .iter()
672 .zip(col_meta.buffer_sizes.iter())
673 .map(|(offset, size)| (*offset, *size))
674 .collect::<Vec<_>>(),
675 );
676 Arc::new(ColumnInfo {
677 index: col_idx as u32,
678 page_infos: Arc::from(page_infos),
679 buffer_offsets_and_sizes,
680 encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
681 })
682 })
683 .collect::<Vec<_>>()
684 }
685
686 fn validate_projection(
687 projection: &ReaderProjection,
688 metadata: &CachedFileMetadata,
689 ) -> Result<()> {
690 if projection.schema.fields.is_empty() {
691 return Err(Error::invalid_input(
692 "Attempt to read zero columns from the file, at least one column must be specified"
693 .to_string(),
694 location!(),
695 ));
696 }
697 let mut column_indices_seen = BTreeSet::new();
698 for column_index in &projection.column_indices {
699 if !column_indices_seen.insert(*column_index) {
700 return Err(Error::invalid_input(
701 format!(
702 "The projection specified the column index {} more than once",
703 column_index
704 ),
705 location!(),
706 ));
707 }
708 if *column_index >= metadata.column_infos.len() as u32 {
709 return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
710 }
711 }
712 Ok(())
713 }
714
715 pub async fn try_open(
722 scheduler: FileScheduler,
723 base_projection: Option<ReaderProjection>,
724 decoder_plugins: Arc<DecoderPlugins>,
725 cache: &FileMetadataCache,
726 options: FileReaderOptions,
727 ) -> Result<Self> {
728 let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
729 let path = scheduler.reader().path().clone();
730 Self::try_open_with_file_metadata(
731 Arc::new(LanceEncodingsIo(scheduler)),
732 path,
733 base_projection,
734 decoder_plugins,
735 file_metadata,
736 cache,
737 options,
738 )
739 .await
740 }
741
742 pub async fn try_open_with_file_metadata(
748 scheduler: Arc<dyn EncodingsIo>,
749 path: Path,
750 base_projection: Option<ReaderProjection>,
751 decoder_plugins: Arc<DecoderPlugins>,
752 file_metadata: Arc<CachedFileMetadata>,
753 cache: &FileMetadataCache,
754 options: FileReaderOptions,
755 ) -> Result<Self> {
756 let cache = Arc::new(cache.with_base_path(path));
757
758 if let Some(base_projection) = base_projection.as_ref() {
759 Self::validate_projection(base_projection, &file_metadata)?;
760 }
761 let num_rows = file_metadata.num_rows;
762 Ok(Self {
763 scheduler,
764 base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
765 file_metadata.file_schema.as_ref(),
766 file_metadata.version(),
767 )),
768 num_rows,
769 metadata: file_metadata,
770 decoder_plugins,
771 cache,
772 options,
773 })
774 }
775
776 fn collect_columns_from_projection(
790 &self,
791 _projection: &ReaderProjection,
792 ) -> Result<Vec<Arc<ColumnInfo>>> {
793 Ok(self.metadata.column_infos.to_vec())
794 }
795
796 #[allow(clippy::too_many_arguments)]
797 fn do_read_range(
798 column_infos: Vec<Arc<ColumnInfo>>,
799 io: Arc<dyn EncodingsIo>,
800 cache: Arc<FileMetadataCache>,
801 num_rows: u64,
802 decoder_plugins: Arc<DecoderPlugins>,
803 range: Range<u64>,
804 batch_size: u32,
805 projection: ReaderProjection,
806 filter: FilterExpression,
807 should_validate: bool,
808 ) -> Result<BoxStream<'static, ReadBatchTask>> {
809 debug!(
810 "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
811 range,
812 batch_size,
813 num_rows,
814 column_infos.len(),
815 projection.schema.fields.len(),
816 );
817
818 let config = SchedulerDecoderConfig {
819 batch_size,
820 cache,
821 decoder_plugins,
822 io,
823 should_validate,
824 };
825
826 let requested_rows = RequestedRows::Ranges(vec![range]);
827
828 Ok(schedule_and_decode(
829 column_infos,
830 requested_rows,
831 filter,
832 projection.column_indices,
833 projection.schema,
834 config,
835 ))
836 }
837
838 fn read_range(
839 &self,
840 range: Range<u64>,
841 batch_size: u32,
842 projection: ReaderProjection,
843 filter: FilterExpression,
844 ) -> Result<BoxStream<'static, ReadBatchTask>> {
845 Self::do_read_range(
847 self.collect_columns_from_projection(&projection)?,
848 self.scheduler.clone(),
849 self.cache.clone(),
850 self.num_rows,
851 self.decoder_plugins.clone(),
852 range,
853 batch_size,
854 projection,
855 filter,
856 self.options.validate_on_decode,
857 )
858 }
859
860 #[allow(clippy::too_many_arguments)]
861 fn do_take_rows(
862 column_infos: Vec<Arc<ColumnInfo>>,
863 io: Arc<dyn EncodingsIo>,
864 cache: Arc<FileMetadataCache>,
865 decoder_plugins: Arc<DecoderPlugins>,
866 indices: Vec<u64>,
867 batch_size: u32,
868 projection: ReaderProjection,
869 filter: FilterExpression,
870 should_validate: bool,
871 ) -> Result<BoxStream<'static, ReadBatchTask>> {
872 debug!(
873 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
874 indices.len(),
875 indices[0],
876 indices[indices.len() - 1],
877 batch_size,
878 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
879 );
880
881 let config = SchedulerDecoderConfig {
882 batch_size,
883 cache,
884 decoder_plugins,
885 io,
886 should_validate,
887 };
888
889 let requested_rows = RequestedRows::Indices(indices);
890
891 Ok(schedule_and_decode(
892 column_infos,
893 requested_rows,
894 filter,
895 projection.column_indices,
896 projection.schema,
897 config,
898 ))
899 }
900
901 fn take_rows(
902 &self,
903 indices: Vec<u64>,
904 batch_size: u32,
905 projection: ReaderProjection,
906 ) -> Result<BoxStream<'static, ReadBatchTask>> {
907 Self::do_take_rows(
909 self.collect_columns_from_projection(&projection)?,
910 self.scheduler.clone(),
911 self.cache.clone(),
912 self.decoder_plugins.clone(),
913 indices,
914 batch_size,
915 projection,
916 FilterExpression::no_filter(),
917 self.options.validate_on_decode,
918 )
919 }
920
921 pub fn read_tasks(
932 &self,
933 params: ReadBatchParams,
934 batch_size: u32,
935 projection: Option<ReaderProjection>,
936 filter: FilterExpression,
937 ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
938 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
939 Self::validate_projection(&projection, &self.metadata)?;
940 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
941 if bound > self.num_rows || bound == self.num_rows && inclusive {
942 Err(Error::invalid_input(
943 format!(
944 "cannot read {:?} from file with {} rows",
945 params, self.num_rows
946 ),
947 location!(),
948 ))
949 } else {
950 Ok(())
951 }
952 };
953 match ¶ms {
954 ReadBatchParams::Indices(indices) => {
955 for idx in indices {
956 match idx {
957 None => {
958 return Err(Error::invalid_input(
959 "Null value in indices array",
960 location!(),
961 ));
962 }
963 Some(idx) => {
964 verify_bound(¶ms, idx as u64, true)?;
965 }
966 }
967 }
968 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
969 self.take_rows(indices, batch_size, projection)
970 }
971 ReadBatchParams::Range(range) => {
972 verify_bound(¶ms, range.end as u64, false)?;
973 self.read_range(
974 range.start as u64..range.end as u64,
975 batch_size,
976 projection,
977 filter,
978 )
979 }
980 ReadBatchParams::RangeFrom(range) => {
981 verify_bound(¶ms, range.start as u64, true)?;
982 self.read_range(
983 range.start as u64..self.num_rows,
984 batch_size,
985 projection,
986 filter,
987 )
988 }
989 ReadBatchParams::RangeTo(range) => {
990 verify_bound(¶ms, range.end as u64, false)?;
991 self.read_range(0..range.end as u64, batch_size, projection, filter)
992 }
993 ReadBatchParams::RangeFull => {
994 self.read_range(0..self.num_rows, batch_size, projection, filter)
995 }
996 }
997 }
998
999 pub fn read_stream_projected(
1021 &self,
1022 params: ReadBatchParams,
1023 batch_size: u32,
1024 batch_readahead: u32,
1025 projection: ReaderProjection,
1026 filter: FilterExpression,
1027 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1028 let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1029 let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1030 let batch_stream = tasks_stream
1031 .map(|task| task.task)
1032 .buffered(batch_readahead as usize)
1033 .boxed();
1034 Ok(Box::pin(RecordBatchStreamAdapter::new(
1035 arrow_schema,
1036 batch_stream,
1037 )))
1038 }
1039
1040 fn take_rows_blocking(
1041 &self,
1042 indices: Vec<u64>,
1043 batch_size: u32,
1044 projection: ReaderProjection,
1045 filter: FilterExpression,
1046 ) -> Result<Box<dyn RecordBatchReader>> {
1047 let column_infos = self.collect_columns_from_projection(&projection)?;
1048 debug!(
1049 "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1050 indices.len(),
1051 indices[0],
1052 indices[indices.len() - 1],
1053 batch_size,
1054 column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1055 );
1056
1057 let config = SchedulerDecoderConfig {
1058 batch_size,
1059 cache: self.cache.clone(),
1060 decoder_plugins: self.decoder_plugins.clone(),
1061 io: self.scheduler.clone(),
1062 should_validate: self.options.validate_on_decode,
1063 };
1064
1065 let requested_rows = RequestedRows::Indices(indices);
1066
1067 schedule_and_decode_blocking(
1068 column_infos,
1069 requested_rows,
1070 filter,
1071 projection.column_indices,
1072 projection.schema,
1073 config,
1074 )
1075 }
1076
1077 pub fn read_stream_projected_blocking(
1089 &self,
1090 params: ReadBatchParams,
1091 batch_size: u32,
1092 projection: Option<ReaderProjection>,
1093 filter: FilterExpression,
1094 ) -> Result<Box<dyn RecordBatchReader>> {
1095 let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1096 Self::validate_projection(&projection, &self.metadata)?;
1097 let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1098 if bound > self.num_rows || bound == self.num_rows && inclusive {
1099 Err(Error::invalid_input(
1100 format!(
1101 "cannot read {:?} from file with {} rows",
1102 params, self.num_rows
1103 ),
1104 location!(),
1105 ))
1106 } else {
1107 Ok(())
1108 }
1109 };
1110 match ¶ms {
1111 ReadBatchParams::Indices(indices) => {
1112 for idx in indices {
1113 match idx {
1114 None => {
1115 return Err(Error::invalid_input(
1116 "Null value in indices array",
1117 location!(),
1118 ));
1119 }
1120 Some(idx) => {
1121 verify_bound(¶ms, idx as u64, true)?;
1122 }
1123 }
1124 }
1125 let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1126 self.take_rows_blocking(indices, batch_size, projection, filter)
1127 }
1128 _ => todo!(),
1129 }
1130 }
1131
1132 pub fn read_stream(
1138 &self,
1139 params: ReadBatchParams,
1140 batch_size: u32,
1141 batch_readahead: u32,
1142 filter: FilterExpression,
1143 ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1144 self.read_stream_projected(
1145 params,
1146 batch_size,
1147 batch_readahead,
1148 self.base_projection.clone(),
1149 filter,
1150 )
1151 }
1152
1153 pub fn schema(&self) -> &Arc<Schema> {
1154 &self.metadata.file_schema
1155 }
1156}
1157
1158pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1160 if let Some(encoding) = &page.encoding {
1161 if let Some(style) = &encoding.location {
1162 match style {
1163 pbfile::encoding::Location::Indirect(indirect) => {
1164 format!(
1165 "IndirectEncoding(pos={},size={})",
1166 indirect.buffer_location, indirect.buffer_length
1167 )
1168 }
1169 pbfile::encoding::Location::Direct(direct) => {
1170 let encoding_any =
1171 prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1172 .expect("failed to deserialize encoding as protobuf");
1173 if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1174 let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1175 match encoding {
1176 Ok(encoding) => {
1177 format!("{:#?}", encoding)
1178 }
1179 Err(err) => {
1180 format!("Unsupported(decode_err={})", err)
1181 }
1182 }
1183 } else if encoding_any.type_url == "/lance.encodings.PageLayout" {
1184 let encoding = encoding_any.to_msg::<pbenc::PageLayout>();
1185 match encoding {
1186 Ok(encoding) => {
1187 format!("{:#?}", encoding)
1188 }
1189 Err(err) => {
1190 format!("Unsupported(decode_err={})", err)
1191 }
1192 }
1193 } else {
1194 format!("Unrecognized(type_url={})", encoding_any.type_url)
1195 }
1196 }
1197 pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1198 }
1199 } else {
1200 "MISSING STYLE".to_string()
1201 }
1202 } else {
1203 "MISSING".to_string()
1204 }
1205}
1206
1207pub trait EncodedBatchReaderExt {
1208 fn try_from_mini_lance(
1209 bytes: Bytes,
1210 schema: &Schema,
1211 version: LanceFileVersion,
1212 ) -> Result<Self>
1213 where
1214 Self: Sized;
1215 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1216 where
1217 Self: Sized;
1218}
1219
1220impl EncodedBatchReaderExt for EncodedBatch {
1221 fn try_from_mini_lance(
1222 bytes: Bytes,
1223 schema: &Schema,
1224 file_version: LanceFileVersion,
1225 ) -> Result<Self>
1226 where
1227 Self: Sized,
1228 {
1229 let projection = ReaderProjection::from_whole_schema(schema, file_version);
1230 let footer = FileReader::decode_footer(&bytes)?;
1231
1232 let column_metadata_start = footer.column_meta_start as usize;
1235 let column_metadata_end = footer.global_buff_offsets_start as usize;
1236 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1237 let column_metadatas =
1238 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1239
1240 let file_version = LanceFileVersion::try_from_major_minor(
1241 footer.major_version as u32,
1242 footer.minor_version as u32,
1243 )?;
1244
1245 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1246
1247 Ok(Self {
1248 data: bytes,
1249 num_rows: page_table
1250 .first()
1251 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1252 .unwrap_or(0),
1253 page_table,
1254 top_level_columns: projection.column_indices,
1255 schema: Arc::new(schema.clone()),
1256 })
1257 }
1258
1259 fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1260 where
1261 Self: Sized,
1262 {
1263 let footer = FileReader::decode_footer(&bytes)?;
1264 let file_version = LanceFileVersion::try_from_major_minor(
1265 footer.major_version as u32,
1266 footer.minor_version as u32,
1267 )?;
1268
1269 let gbo_table = FileReader::do_decode_gbo_table(
1270 &bytes.slice(footer.global_buff_offsets_start as usize..),
1271 &footer,
1272 file_version,
1273 )?;
1274 if gbo_table.is_empty() {
1275 return Err(Error::Internal {
1276 message: "File did not contain any global buffers, schema expected".to_string(),
1277 location: location!(),
1278 });
1279 }
1280 let schema_start = gbo_table[0].position as usize;
1281 let schema_size = gbo_table[0].size as usize;
1282
1283 let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1284 let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1285 let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1286
1287 let column_metadata_start = footer.column_meta_start as usize;
1290 let column_metadata_end = footer.global_buff_offsets_start as usize;
1291 let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1292 let column_metadatas =
1293 FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1294
1295 let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1296
1297 Ok(Self {
1298 data: bytes,
1299 num_rows: page_table
1300 .first()
1301 .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1302 .unwrap_or(0),
1303 page_table,
1304 top_level_columns: projection.column_indices,
1305 schema: Arc::new(schema),
1306 })
1307 }
1308}
1309
1310#[cfg(test)]
1311pub mod tests {
1312 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1313
1314 use arrow_array::{
1315 types::{Float64Type, Int32Type},
1316 RecordBatch, UInt32Array,
1317 };
1318 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1319 use bytes::Bytes;
1320 use futures::{prelude::stream::TryStreamExt, StreamExt};
1321 use lance_arrow::RecordBatchExt;
1322 use lance_core::{datatypes::Schema, ArrowResult};
1323 use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
1324 use lance_encoding::{
1325 decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1326 encoder::{encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodingOptions},
1327 version::LanceFileVersion,
1328 };
1329 use lance_io::stream::RecordBatchStream;
1330 use log::debug;
1331 use tokio::sync::mpsc;
1332
1333 use crate::v2::{
1334 reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1335 testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1336 writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1337 };
1338
1339 async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1340 let location_type = DataType::Struct(Fields::from(vec![
1341 Field::new("x", DataType::Float64, true),
1342 Field::new("y", DataType::Float64, true),
1343 ]));
1344 let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1345
1346 let mut reader = gen()
1347 .col("score", array::rand::<Float64Type>())
1348 .col("location", array::rand_type(&location_type))
1349 .col("categories", array::rand_type(&categories_type))
1350 .col("binary", array::rand_type(&DataType::Binary));
1351 if version <= LanceFileVersion::V2_0 {
1352 reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1353 }
1354 let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1355
1356 write_lance_file(
1357 reader,
1358 fs,
1359 FileWriterOptions {
1360 format_version: Some(version),
1361 ..Default::default()
1362 },
1363 )
1364 .await
1365 }
1366
1367 type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1368
1369 async fn verify_expected(
1370 expected: &[RecordBatch],
1371 mut actual: Pin<Box<dyn RecordBatchStream>>,
1372 read_size: u32,
1373 transform: Option<Transformer>,
1374 ) {
1375 let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1376 let mut expected_iter = expected.iter().map(|batch| {
1377 if let Some(transform) = &transform {
1378 transform(batch)
1379 } else {
1380 batch.clone()
1381 }
1382 });
1383 let mut next_expected = expected_iter.next().unwrap().clone();
1384 while let Some(actual) = actual.next().await {
1385 let mut actual = actual.unwrap();
1386 let mut rows_to_verify = actual.num_rows() as u32;
1387 let expected_length = remaining.min(read_size);
1388 assert_eq!(expected_length, rows_to_verify);
1389
1390 while rows_to_verify > 0 {
1391 let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1392 assert_eq!(
1393 next_expected.slice(0, next_slice_len as usize),
1394 actual.slice(0, next_slice_len as usize)
1395 );
1396 remaining -= next_slice_len;
1397 rows_to_verify -= next_slice_len;
1398 if remaining > 0 {
1399 if next_slice_len == next_expected.num_rows() as u32 {
1400 next_expected = expected_iter.next().unwrap().clone();
1401 } else {
1402 next_expected = next_expected.slice(
1403 next_slice_len as usize,
1404 next_expected.num_rows() - next_slice_len as usize,
1405 );
1406 }
1407 }
1408 if rows_to_verify > 0 {
1409 actual = actual.slice(
1410 next_slice_len as usize,
1411 actual.num_rows() - next_slice_len as usize,
1412 );
1413 }
1414 }
1415 }
1416 assert_eq!(remaining, 0);
1417 }
1418
1419 #[tokio::test]
1420 async fn test_round_trip() {
1421 let fs = FsFixture::default();
1422
1423 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1424
1425 for read_size in [32, 1024, 1024 * 1024] {
1426 let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1427 let file_reader = FileReader::try_open(
1428 file_scheduler,
1429 None,
1430 Arc::<DecoderPlugins>::default(),
1431 &test_cache(),
1432 FileReaderOptions::default(),
1433 )
1434 .await
1435 .unwrap();
1436
1437 let schema = file_reader.schema();
1438 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1439
1440 let batch_stream = file_reader
1441 .read_stream(
1442 lance_io::ReadBatchParams::RangeFull,
1443 read_size,
1444 16,
1445 FilterExpression::no_filter(),
1446 )
1447 .unwrap();
1448
1449 verify_expected(&data, batch_stream, read_size, None).await;
1450 }
1451 }
1452
1453 #[test_log::test(tokio::test)]
1454 async fn test_encoded_batch_round_trip() {
1455 let data = gen()
1456 .col("x", array::rand::<Int32Type>())
1457 .col("y", array::rand_utf8(ByteCount::from(16), false))
1458 .into_batch_rows(RowCount::from(10000))
1459 .unwrap();
1460
1461 let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1462
1463 let encoding_options = EncodingOptions {
1464 cache_bytes_per_column: 4096,
1465 max_page_bytes: 32 * 1024 * 1024,
1466 keep_original_array: true,
1467 buffer_alignment: 64,
1468 };
1469 let encoded_batch = encode_batch(
1470 &data,
1471 lance_schema.clone(),
1472 &CoreFieldEncodingStrategy::default(),
1473 &encoding_options,
1474 )
1475 .await
1476 .unwrap();
1477
1478 let bytes = encoded_batch.try_to_self_described_lance().unwrap();
1480
1481 let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1482
1483 let decoded = decode_batch(
1484 &decoded_batch,
1485 &FilterExpression::no_filter(),
1486 Arc::<DecoderPlugins>::default(),
1487 false,
1488 LanceFileVersion::default(),
1489 None,
1490 )
1491 .await
1492 .unwrap();
1493
1494 assert_eq!(data, decoded);
1495
1496 let bytes = encoded_batch.try_to_mini_lance().unwrap();
1498 let decoded_batch =
1499 EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1500 .unwrap();
1501 let decoded = decode_batch(
1502 &decoded_batch,
1503 &FilterExpression::no_filter(),
1504 Arc::<DecoderPlugins>::default(),
1505 false,
1506 LanceFileVersion::default(),
1507 None,
1508 )
1509 .await
1510 .unwrap();
1511
1512 assert_eq!(data, decoded);
1513 }
1514
1515 #[test_log::test(tokio::test)]
1516 async fn test_projection() {
1517 let fs = FsFixture::default();
1518
1519 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1520 let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1521
1522 let field_id_mapping = written_file
1523 .field_id_mapping
1524 .iter()
1525 .copied()
1526 .collect::<BTreeMap<_, _>>();
1527
1528 let empty_projection = ReaderProjection {
1529 column_indices: Vec::default(),
1530 schema: Arc::new(Schema::default()),
1531 };
1532
1533 for columns in [
1534 vec!["score"],
1535 vec!["location"],
1536 vec!["categories"],
1537 vec!["score.x"],
1538 vec!["score", "categories"],
1539 vec!["score", "location"],
1540 vec!["location", "categories"],
1541 vec!["score.y", "location", "categories"],
1542 ] {
1543 debug!("Testing round trip with projection {:?}", columns);
1544 let file_reader = FileReader::try_open(
1546 file_scheduler.clone(),
1547 None,
1548 Arc::<DecoderPlugins>::default(),
1549 &test_cache(),
1550 FileReaderOptions::default(),
1551 )
1552 .await
1553 .unwrap();
1554
1555 let projected_schema = written_file.schema.project(&columns).unwrap();
1556 let projection = ReaderProjection::from_field_ids(
1557 &file_reader,
1558 &projected_schema,
1559 &field_id_mapping,
1560 )
1561 .unwrap();
1562
1563 let batch_stream = file_reader
1564 .read_stream_projected(
1565 lance_io::ReadBatchParams::RangeFull,
1566 1024,
1567 16,
1568 projection.clone(),
1569 FilterExpression::no_filter(),
1570 )
1571 .unwrap();
1572
1573 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1574 verify_expected(
1575 &written_file.data,
1576 batch_stream,
1577 1024,
1578 Some(Box::new(move |batch: &RecordBatch| {
1579 batch.project_by_schema(&projection_arrow).unwrap()
1580 })),
1581 )
1582 .await;
1583
1584 let file_reader = FileReader::try_open(
1586 file_scheduler.clone(),
1587 Some(projection.clone()),
1588 Arc::<DecoderPlugins>::default(),
1589 &test_cache(),
1590 FileReaderOptions::default(),
1591 )
1592 .await
1593 .unwrap();
1594
1595 let batch_stream = file_reader
1596 .read_stream(
1597 lance_io::ReadBatchParams::RangeFull,
1598 1024,
1599 16,
1600 FilterExpression::no_filter(),
1601 )
1602 .unwrap();
1603
1604 let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1605 verify_expected(
1606 &written_file.data,
1607 batch_stream,
1608 1024,
1609 Some(Box::new(move |batch: &RecordBatch| {
1610 batch.project_by_schema(&projection_arrow).unwrap()
1611 })),
1612 )
1613 .await;
1614
1615 assert!(file_reader
1616 .read_stream_projected(
1617 lance_io::ReadBatchParams::RangeFull,
1618 1024,
1619 16,
1620 empty_projection.clone(),
1621 FilterExpression::no_filter(),
1622 )
1623 .is_err());
1624 }
1625
1626 assert!(FileReader::try_open(
1627 file_scheduler.clone(),
1628 Some(empty_projection),
1629 Arc::<DecoderPlugins>::default(),
1630 &test_cache(),
1631 FileReaderOptions::default(),
1632 )
1633 .await
1634 .is_err());
1635
1636 let arrow_schema = ArrowSchema::new(vec![
1637 Field::new("x", DataType::Int32, true),
1638 Field::new("y", DataType::Int32, true),
1639 ]);
1640 let schema = Schema::try_from(&arrow_schema).unwrap();
1641
1642 let projection_with_dupes = ReaderProjection {
1643 column_indices: vec![0, 0],
1644 schema: Arc::new(schema),
1645 };
1646
1647 assert!(FileReader::try_open(
1648 file_scheduler.clone(),
1649 Some(projection_with_dupes),
1650 Arc::<DecoderPlugins>::default(),
1651 &test_cache(),
1652 FileReaderOptions::default(),
1653 )
1654 .await
1655 .is_err());
1656 }
1657
1658 #[test_log::test(tokio::test)]
1659 async fn test_compressing_buffer() {
1660 let fs = FsFixture::default();
1661
1662 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1663 let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1664
1665 let file_reader = FileReader::try_open(
1667 file_scheduler.clone(),
1668 None,
1669 Arc::<DecoderPlugins>::default(),
1670 &test_cache(),
1671 FileReaderOptions::default(),
1672 )
1673 .await
1674 .unwrap();
1675
1676 let mut projection = written_file.schema.project(&["score"]).unwrap();
1677 for field in projection.fields.iter_mut() {
1678 field
1679 .metadata
1680 .insert("lance:compression".to_string(), "zstd".to_string());
1681 }
1682 let projection = ReaderProjection {
1683 column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1684 schema: Arc::new(projection),
1685 };
1686
1687 let batch_stream = file_reader
1688 .read_stream_projected(
1689 lance_io::ReadBatchParams::RangeFull,
1690 1024,
1691 16,
1692 projection.clone(),
1693 FilterExpression::no_filter(),
1694 )
1695 .unwrap();
1696
1697 let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1698 verify_expected(
1699 &written_file.data,
1700 batch_stream,
1701 1024,
1702 Some(Box::new(move |batch: &RecordBatch| {
1703 batch.project_by_schema(&projection_arrow).unwrap()
1704 })),
1705 )
1706 .await;
1707 }
1708
1709 #[tokio::test]
1710 async fn test_read_all() {
1711 let fs = FsFixture::default();
1712 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1713 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1714
1715 let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1716 let file_reader = FileReader::try_open(
1717 file_scheduler.clone(),
1718 None,
1719 Arc::<DecoderPlugins>::default(),
1720 &test_cache(),
1721 FileReaderOptions::default(),
1722 )
1723 .await
1724 .unwrap();
1725
1726 let batches = file_reader
1727 .read_stream(
1728 lance_io::ReadBatchParams::RangeFull,
1729 total_rows as u32,
1730 16,
1731 FilterExpression::no_filter(),
1732 )
1733 .unwrap()
1734 .try_collect::<Vec<_>>()
1735 .await
1736 .unwrap();
1737 assert_eq!(batches.len(), 1);
1738 assert_eq!(batches[0].num_rows(), total_rows);
1739 }
1740
1741 #[tokio::test]
1742 async fn test_blocking_take() {
1743 let fs = FsFixture::default();
1744 let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
1745 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1746
1747 let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1748 let file_reader = FileReader::try_open(
1749 file_scheduler.clone(),
1750 Some(ReaderProjection::from_column_names(&schema, &["score"]).unwrap()),
1751 Arc::<DecoderPlugins>::default(),
1752 &test_cache(),
1753 FileReaderOptions::default(),
1754 )
1755 .await
1756 .unwrap();
1757
1758 let batches = tokio::task::spawn_blocking(move || {
1759 file_reader
1760 .read_stream_projected_blocking(
1761 lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
1762 total_rows as u32,
1763 None,
1764 FilterExpression::no_filter(),
1765 )
1766 .unwrap()
1767 .collect::<ArrowResult<Vec<_>>>()
1768 .unwrap()
1769 })
1770 .await
1771 .unwrap();
1772
1773 assert_eq!(batches.len(), 1);
1774 assert_eq!(batches[0].num_rows(), 5);
1775 assert_eq!(batches[0].num_columns(), 1);
1776 }
1777
1778 #[tokio::test(flavor = "multi_thread")]
1779 async fn test_drop_in_progress() {
1780 let fs = FsFixture::default();
1781 let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1782 let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1783
1784 let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1785 let file_reader = FileReader::try_open(
1786 file_scheduler.clone(),
1787 None,
1788 Arc::<DecoderPlugins>::default(),
1789 &test_cache(),
1790 FileReaderOptions::default(),
1791 )
1792 .await
1793 .unwrap();
1794
1795 let mut batches = file_reader
1796 .read_stream(
1797 lance_io::ReadBatchParams::RangeFull,
1798 (total_rows / 10) as u32,
1799 16,
1800 FilterExpression::no_filter(),
1801 )
1802 .unwrap();
1803
1804 drop(file_reader);
1805
1806 let batch = batches.next().await.unwrap().unwrap();
1807 assert!(batch.num_rows() > 0);
1808
1809 drop(batches);
1811 }
1812
1813 #[tokio::test]
1814 async fn drop_while_scheduling() {
1815 let fs = FsFixture::default();
1825 let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1826 let total_rows = written_file
1827 .data
1828 .iter()
1829 .map(|batch| batch.num_rows())
1830 .sum::<usize>();
1831
1832 let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1833 let file_reader = FileReader::try_open(
1834 file_scheduler.clone(),
1835 None,
1836 Arc::<DecoderPlugins>::default(),
1837 &test_cache(),
1838 FileReaderOptions::default(),
1839 )
1840 .await
1841 .unwrap();
1842
1843 let projection =
1844 ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
1845 let column_infos = file_reader
1846 .collect_columns_from_projection(&projection)
1847 .unwrap();
1848 let mut decode_scheduler = DecodeBatchScheduler::try_new(
1849 &projection.schema,
1850 &projection.column_indices,
1851 &column_infos,
1852 &vec![],
1853 total_rows as u64,
1854 Arc::<DecoderPlugins>::default(),
1855 file_reader.scheduler.clone(),
1856 test_cache(),
1857 &FilterExpression::no_filter(),
1858 )
1859 .await
1860 .unwrap();
1861
1862 let range = 0..total_rows as u64;
1863
1864 let (tx, rx) = mpsc::unbounded_channel();
1865
1866 drop(rx);
1868
1869 decode_scheduler.schedule_range(
1871 range,
1872 &FilterExpression::no_filter(),
1873 tx,
1874 file_reader.scheduler.clone(),
1875 )
1876 }
1877
1878 #[tokio::test]
1879 async fn test_global_buffers() {
1880 let fs = FsFixture::default();
1881
1882 let lance_schema =
1883 lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
1884 "foo",
1885 DataType::Int32,
1886 true,
1887 )]))
1888 .unwrap();
1889
1890 let mut file_writer = FileWriter::try_new(
1891 fs.object_store.create(&fs.tmp_path).await.unwrap(),
1892 lance_schema.clone(),
1893 FileWriterOptions::default(),
1894 )
1895 .unwrap();
1896
1897 let test_bytes = Bytes::from_static(b"hello");
1898
1899 let buf_index = file_writer
1900 .add_global_buffer(test_bytes.clone())
1901 .await
1902 .unwrap();
1903
1904 assert_eq!(buf_index, 1);
1905
1906 file_writer.finish().await.unwrap();
1907
1908 let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1909 let file_reader = FileReader::try_open(
1910 file_scheduler.clone(),
1911 None,
1912 Arc::<DecoderPlugins>::default(),
1913 &test_cache(),
1914 FileReaderOptions::default(),
1915 )
1916 .await
1917 .unwrap();
1918
1919 let buf = file_reader.read_global_buffer(1).await.unwrap();
1920 assert_eq!(buf, test_bytes);
1921 }
1922}