lance_file/v2/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    io::Cursor,
7    ops::Range,
8    pin::Pin,
9    sync::Arc,
10};
11
12use arrow_array::RecordBatchReader;
13use arrow_schema::Schema as ArrowSchema;
14use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
15use bytes::{Bytes, BytesMut};
16use deepsize::{Context, DeepSizeOf};
17use futures::{stream::BoxStream, Stream, StreamExt};
18use lance_encoding::{
19    decoder::{
20        schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderPlugins,
21        FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows,
22        SchedulerDecoderConfig,
23    },
24    encoder::EncodedBatch,
25    version::LanceFileVersion,
26    EncodingsIo,
27};
28use log::debug;
29use object_store::path::Path;
30use prost::{Message, Name};
31use snafu::location;
32
33use lance_core::{
34    cache::FileMetadataCache,
35    datatypes::{Field, Schema},
36    Error, Result,
37};
38use lance_encoding::format::pb as pbenc;
39use lance_io::{
40    scheduler::FileScheduler,
41    stream::{RecordBatchStream, RecordBatchStreamAdapter},
42    ReadBatchParams,
43};
44
45use crate::{
46    datatypes::{Fields, FieldsWithMeta},
47    format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION},
48    v2::writer::PAGE_BUFFER_ALIGNMENT,
49};
50
51use super::io::LanceEncodingsIo;
52
53// For now, we don't use global buffers for anything other than schema.  If we
54// use these later we should make them lazily loaded and then cached once loaded.
55//
56// We store their position / length for debugging purposes
57#[derive(Debug, DeepSizeOf)]
58pub struct BufferDescriptor {
59    pub position: u64,
60    pub size: u64,
61}
62
63/// Statistics summarize some of the file metadata for quick summary info
64#[derive(Debug)]
65pub struct FileStatistics {
66    /// Statistics about each of the columns in the file
67    pub columns: Vec<ColumnStatistics>,
68}
69
70/// Summary information describing a column
71#[derive(Debug)]
72pub struct ColumnStatistics {
73    /// The number of pages in the column
74    pub num_pages: usize,
75    /// The total number of data & metadata bytes in the column
76    ///
77    /// This is the compressed on-disk size
78    pub size_bytes: u64,
79}
80
81// TODO: Caching
82#[derive(Debug)]
83pub struct CachedFileMetadata {
84    /// The schema of the file
85    pub file_schema: Arc<Schema>,
86    /// The column metadatas
87    pub column_metadatas: Vec<pbfile::ColumnMetadata>,
88    pub column_infos: Vec<Arc<ColumnInfo>>,
89    /// The number of rows in the file
90    pub num_rows: u64,
91    pub file_buffers: Vec<BufferDescriptor>,
92    /// The number of bytes contained in the data page section of the file
93    pub num_data_bytes: u64,
94    /// The number of bytes contained in the column metadata (not including buffers
95    /// referenced by the metadata)
96    pub num_column_metadata_bytes: u64,
97    /// The number of bytes contained in global buffers
98    pub num_global_buffer_bytes: u64,
99    /// The number of bytes contained in the CMO and GBO tables
100    pub num_footer_bytes: u64,
101    pub major_version: u16,
102    pub minor_version: u16,
103}
104
105impl DeepSizeOf for CachedFileMetadata {
106    // TODO: include size for `column_metadatas` and `column_infos`.
107    fn deep_size_of_children(&self, context: &mut Context) -> usize {
108        self.file_schema.deep_size_of_children(context)
109            + self
110                .file_buffers
111                .iter()
112                .map(|file_buffer| file_buffer.deep_size_of_children(context))
113                .sum::<usize>()
114    }
115}
116
117impl CachedFileMetadata {
118    pub fn version(&self) -> LanceFileVersion {
119        match (self.major_version, self.minor_version) {
120            (0, 3) => LanceFileVersion::V2_0,
121            (2, 1) => LanceFileVersion::V2_1,
122            _ => panic!(
123                "Unsupported version: {}.{}",
124                self.major_version, self.minor_version
125            ),
126        }
127    }
128}
129
130/// Selecting columns from a lance file requires specifying both the
131/// index of the column and the data type of the column
132///
133/// Partly, this is because it is not strictly required that columns
134/// be read into the same type.  For example, a string column may be
135/// read as a string, large_string or string_view type.
136///
137/// A read will only succeed if the decoder for a column is capable
138/// of decoding into the requested type.
139///
140/// Note that this should generally be limited to different in-memory
141/// representations of the same semantic type.  An encoding could
142/// theoretically support "casting" (e.g. int to string,  etc.) but
143/// there is little advantage in doing so here.
144///
145/// Note: in order to specify a projection the user will need some way
146/// to figure out the column indices.  In the table format we do this
147/// using field IDs and keeping track of the field id->column index mapping.
148///
149/// If users are not using the table format then they will need to figure
150/// out some way to do this themselves.
151#[derive(Debug, Clone)]
152pub struct ReaderProjection {
153    /// The data types (schema) of the selected columns.  The names
154    /// of the schema are arbitrary and ignored.
155    pub schema: Arc<Schema>,
156    /// The indices of the columns to load.
157    ///
158    /// The mapping should be as follows:
159    ///
160    /// - Primitive: the index of the column in the schema
161    /// - List: the index of the list column in the schema
162    ///         followed by the column indices of the children
163    /// - FixedSizeList (of primitive): the index of the column in the schema
164    ///         (this case is not nested)
165    /// - FixedSizeList (of non-primitive): not yet implemented
166    /// - Dictionary: same as primitive
167    /// - Struct: the index of the struct column in the schema
168    ///          followed by the column indices of the children
169    ///
170    /// In other words, this should be a DFS listing of the desired schema.
171    ///
172    /// For example, if the goal is to load:
173    ///
174    ///   x: int32
175    ///   y: struct<z: int32, w: string>
176    ///   z: list<int32>
177    ///
178    /// and the schema originally used to store the data was:
179    ///
180    ///   a: struct<x: int32>
181    ///   b: int64
182    ///   y: struct<z: int32, c: int64, w: string>
183    ///   z: list<int32>
184    ///
185    /// Then the column_indices should be [1, 3, 4, 6, 7, 8]
186    pub column_indices: Vec<u32>,
187}
188
189impl ReaderProjection {
190    fn from_field_ids_helper<'a>(
191        reader: &FileReader,
192        fields: impl Iterator<Item = &'a Field>,
193        field_id_to_column_index: &BTreeMap<u32, u32>,
194        column_indices: &mut Vec<u32>,
195    ) -> Result<()> {
196        for field in fields {
197            let is_structural = reader.metadata.version() >= LanceFileVersion::V2_1;
198            // In the 2.0 system we needed ids for intermediate fields.  In 2.1+
199            // we only need ids for leaf fields.
200            if !is_structural || field.children.is_empty() {
201                if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
202                {
203                    column_indices.push(column_idx);
204                }
205            }
206            Self::from_field_ids_helper(
207                reader,
208                field.children.iter(),
209                field_id_to_column_index,
210                column_indices,
211            )?;
212        }
213        Ok(())
214    }
215
216    /// Creates a projection using a mapping from field IDs to column indices
217    ///
218    /// You can obtain such a mapping when the file is written using the
219    /// [`crate::v2::writer::FileWriter::field_id_to_column_indices`] method.
220    pub fn from_field_ids(
221        reader: &FileReader,
222        schema: &Schema,
223        field_id_to_column_index: &BTreeMap<u32, u32>,
224    ) -> Result<Self> {
225        let mut column_indices = Vec::new();
226        Self::from_field_ids_helper(
227            reader,
228            schema.fields.iter(),
229            field_id_to_column_index,
230            &mut column_indices,
231        )?;
232        Ok(Self {
233            schema: Arc::new(schema.clone()),
234            column_indices,
235        })
236    }
237
238    /// Creates a projection that reads the entire file
239    ///
240    /// If the schema provided is not the schema of the entire file then
241    /// the projection will be invalid and the read will fail.
242    /// If the field is a `struct datatype` with `packed` set to true in the field metadata,
243    /// the whole struct has one column index.
244    /// To support nested `packed-struct encoding`, this method need to be further adjusted.
245    pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
246        let schema = Arc::new(schema.clone());
247        let is_structural = version >= LanceFileVersion::V2_1;
248        let mut column_indices = vec![];
249        let mut curr_column_idx = 0;
250        let mut packed_struct_fields_num = 0;
251        for field in schema.fields_pre_order() {
252            if packed_struct_fields_num > 0 {
253                packed_struct_fields_num -= 1;
254                continue;
255            }
256            if field.is_packed_struct() {
257                column_indices.push(curr_column_idx);
258                curr_column_idx += 1;
259                packed_struct_fields_num = field.children.len();
260            } else if field.children.is_empty() || !is_structural {
261                column_indices.push(curr_column_idx);
262                curr_column_idx += 1;
263            }
264        }
265        Self {
266            schema,
267            column_indices,
268        }
269    }
270
271    /// Creates a projection that reads the specified columns provided by name
272    ///
273    /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`]
274    ///
275    /// If the schema provided is not the schema of the entire file then
276    /// the projection will be invalid and the read will fail.
277    pub fn from_column_names(schema: &Schema, column_names: &[&str]) -> Result<Self> {
278        let field_id_to_column_index = schema
279            .fields_pre_order()
280            .enumerate()
281            .map(|(idx, field)| (field.id as u32, idx as u32))
282            .collect::<BTreeMap<_, _>>();
283        let projected = schema.project(column_names)?;
284        let column_indices = projected
285            .fields_pre_order()
286            .map(|f| field_id_to_column_index[&(f.id as u32)])
287            .collect::<Vec<_>>();
288        Ok(Self {
289            schema: Arc::new(projected),
290            column_indices,
291        })
292    }
293}
294
295#[derive(Debug, Default)]
296pub struct FileReaderOptions {
297    validate_on_decode: bool,
298}
299
300#[derive(Debug)]
301pub struct FileReader {
302    scheduler: Arc<dyn EncodingsIo>,
303    // The default projection to be applied to all reads
304    base_projection: ReaderProjection,
305    num_rows: u64,
306    metadata: Arc<CachedFileMetadata>,
307    decoder_plugins: Arc<DecoderPlugins>,
308    cache: Arc<FileMetadataCache>,
309    options: FileReaderOptions,
310}
311#[derive(Debug)]
312struct Footer {
313    #[allow(dead_code)]
314    column_meta_start: u64,
315    // We don't use this today because we always load metadata for every column
316    // and don't yet support "metadata projection"
317    #[allow(dead_code)]
318    column_meta_offsets_start: u64,
319    global_buff_offsets_start: u64,
320    num_global_buffers: u32,
321    num_columns: u32,
322    major_version: u16,
323    minor_version: u16,
324}
325
326const FOOTER_LEN: usize = 40;
327
328impl FileReader {
329    pub fn num_rows(&self) -> u64 {
330        self.num_rows
331    }
332
333    pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
334        &self.metadata
335    }
336
337    pub fn file_statistics(&self) -> FileStatistics {
338        let column_metadatas = &self.metadata().column_metadatas;
339
340        let column_stats = column_metadatas
341            .iter()
342            .map(|col_metadata| {
343                let num_pages = col_metadata.pages.len();
344                let size_bytes = col_metadata
345                    .pages
346                    .iter()
347                    .map(|page| page.buffer_sizes.iter().sum::<u64>())
348                    .sum::<u64>();
349                ColumnStatistics {
350                    num_pages,
351                    size_bytes,
352                }
353            })
354            .collect();
355
356        FileStatistics {
357            columns: column_stats,
358        }
359    }
360
361    pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
362        let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
363        self.scheduler
364            .submit_single(
365                buffer_desc.position..buffer_desc.position + buffer_desc.size,
366                0,
367            )
368            .await
369    }
370
371    async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
372        let file_size = scheduler.reader().size().await? as u64;
373        let begin = if file_size < scheduler.reader().block_size() as u64 {
374            0
375        } else {
376            file_size - scheduler.reader().block_size() as u64
377        };
378        let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
379        Ok((tail_bytes, file_size))
380    }
381
382    // Checks to make sure the footer is written correctly and returns the
383    // position of the file descriptor (which comes from the footer)
384    fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
385        let len = footer_bytes.len();
386        if len < FOOTER_LEN {
387            return Err(Error::io(
388                format!(
389                    "does not have sufficient data, len: {}, bytes: {:?}",
390                    len, footer_bytes
391                ),
392                location!(),
393            ));
394        }
395        let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
396
397        let column_meta_start = cursor.read_u64::<LittleEndian>()?;
398        let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
399        let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
400        let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
401        let num_columns = cursor.read_u32::<LittleEndian>()?;
402        let major_version = cursor.read_u16::<LittleEndian>()?;
403        let minor_version = cursor.read_u16::<LittleEndian>()?;
404
405        if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
406            return Err(Error::version_conflict(
407                "Attempt to use the lance v2 reader to read a legacy file".to_string(),
408                major_version,
409                minor_version,
410                location!(),
411            ));
412        }
413
414        let magic_bytes = footer_bytes.slice(len - 4..);
415        if magic_bytes.as_ref() != MAGIC {
416            return Err(Error::io(
417                format!(
418                    "file does not appear to be a Lance file (invalid magic: {:?})",
419                    MAGIC
420                ),
421                location!(),
422            ));
423        }
424        Ok(Footer {
425            column_meta_start,
426            column_meta_offsets_start,
427            global_buff_offsets_start,
428            num_global_buffers,
429            num_columns,
430            major_version,
431            minor_version,
432        })
433    }
434
435    // TODO: Once we have coalesced I/O we should only read the column metadatas that we need
436    fn read_all_column_metadata(
437        column_metadata_bytes: Bytes,
438        footer: &Footer,
439    ) -> Result<Vec<pbfile::ColumnMetadata>> {
440        let column_metadata_start = footer.column_meta_start;
441        // cmo == column_metadata_offsets
442        let cmo_table_size = 16 * footer.num_columns as usize;
443        let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
444
445        (0..footer.num_columns)
446            .map(|col_idx| {
447                let offset = (col_idx * 16) as usize;
448                let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
449                let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
450                let normalized_position = (position - column_metadata_start) as usize;
451                let normalized_end = normalized_position + (length as usize);
452                Ok(pbfile::ColumnMetadata::decode(
453                    &column_metadata_bytes[normalized_position..normalized_end],
454                )?)
455            })
456            .collect::<Result<Vec<_>>>()
457    }
458
459    async fn optimistic_tail_read(
460        data: &Bytes,
461        start_pos: u64,
462        scheduler: &FileScheduler,
463        file_len: u64,
464    ) -> Result<Bytes> {
465        let num_bytes_needed = (file_len - start_pos) as usize;
466        if data.len() >= num_bytes_needed {
467            Ok(data.slice((data.len() - num_bytes_needed)..))
468        } else {
469            let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
470            let start = file_len - num_bytes_needed as u64;
471            let missing_bytes = scheduler
472                .submit_single(start..start + num_bytes_missing, 0)
473                .await?;
474            let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
475            combined.extend(missing_bytes);
476            combined.extend(data);
477            Ok(combined.freeze())
478        }
479    }
480
481    fn do_decode_gbo_table(
482        gbo_bytes: &Bytes,
483        footer: &Footer,
484        version: LanceFileVersion,
485    ) -> Result<Vec<BufferDescriptor>> {
486        let mut global_bufs_cursor = Cursor::new(gbo_bytes);
487
488        let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
489        for _ in 0..footer.num_global_buffers {
490            let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
491            assert!(
492                version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
493            );
494            let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
495            global_buffers.push(BufferDescriptor {
496                position: buf_pos,
497                size: buf_size,
498            });
499        }
500
501        Ok(global_buffers)
502    }
503
504    async fn decode_gbo_table(
505        tail_bytes: &Bytes,
506        file_len: u64,
507        scheduler: &FileScheduler,
508        footer: &Footer,
509        version: LanceFileVersion,
510    ) -> Result<Vec<BufferDescriptor>> {
511        // This could, in theory, trigger another IOP but the GBO table should never be large
512        // enough for that to happen
513        let gbo_bytes = Self::optimistic_tail_read(
514            tail_bytes,
515            footer.global_buff_offsets_start,
516            scheduler,
517            file_len,
518        )
519        .await?;
520        Self::do_decode_gbo_table(&gbo_bytes, footer, version)
521    }
522
523    fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
524        let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
525        let pb_schema = file_descriptor.schema.unwrap();
526        let num_rows = file_descriptor.length;
527        let fields_with_meta = FieldsWithMeta {
528            fields: Fields(pb_schema.fields),
529            metadata: pb_schema.metadata,
530        };
531        let schema = lance_core::datatypes::Schema::from(fields_with_meta);
532        Ok((num_rows, schema))
533    }
534
535    // TODO: Support late projection.  Currently, if we want to perform a
536    // projected read of a file, we load all of the column metadata, and then
537    // only read the column data that is requested.  This is fine for most cases.
538    //
539    // However, if there are many columns then loading all of the column metadata
540    // may be expensive.  We should support a mode where we only load the column
541    // metadata for the columns that are requested (the file format supports this).
542    //
543    // The main challenge is that we either need to ignore the column metadata cache
544    // or have a more sophisticated cache that can cache per-column metadata.
545    //
546    // Also, if the number of columns is fairly small, it's faster to read them as a
547    // single IOP, but we can fix this through coalescing.
548    pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
549        // 1. read the footer
550        let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
551        let footer = Self::decode_footer(&tail_bytes)?;
552
553        let file_version = LanceFileVersion::try_from_major_minor(
554            footer.major_version as u32,
555            footer.minor_version as u32,
556        )?;
557
558        let gbo_table =
559            Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
560        if gbo_table.is_empty() {
561            return Err(Error::Internal {
562                message: "File did not contain any global buffers, schema expected".to_string(),
563                location: location!(),
564            });
565        }
566        let schema_start = gbo_table[0].position;
567        let schema_size = gbo_table[0].size;
568
569        let num_footer_bytes = file_len - schema_start;
570
571        // By default we read all column metadatas.  We do NOT read the column metadata buffers
572        // at this point.  We only want to read the column metadata for columns we are actually loading.
573        let all_metadata_bytes =
574            Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
575
576        let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
577        let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
578
579        // Next, read the metadata for the columns
580        // This is both the column metadata and the CMO table
581        let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
582        let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
583        let column_metadata_bytes =
584            all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
585        let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
586
587        let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
588        let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
589        let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
590
591        let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
592
593        Ok(CachedFileMetadata {
594            file_schema: Arc::new(schema),
595            column_metadatas,
596            column_infos,
597            num_rows,
598            num_data_bytes,
599            num_column_metadata_bytes,
600            num_global_buffer_bytes,
601            num_footer_bytes,
602            file_buffers: gbo_table,
603            major_version: footer.major_version,
604            minor_version: footer.minor_version,
605        })
606    }
607
608    fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
609        match &encoding.location {
610            Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
611            Some(pbfile::encoding::Location::Direct(encoding)) => {
612                let encoding_buf = Bytes::from(encoding.encoding.clone());
613                let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
614                encoding_any.to_msg::<M>().unwrap()
615            }
616            Some(pbfile::encoding::Location::None(_)) => panic!(),
617            None => panic!(),
618        }
619    }
620
621    fn meta_to_col_infos(
622        column_metadatas: &[pbfile::ColumnMetadata],
623        file_version: LanceFileVersion,
624    ) -> Vec<Arc<ColumnInfo>> {
625        column_metadatas
626            .iter()
627            .enumerate()
628            .map(|(col_idx, col_meta)| {
629                let page_infos = col_meta
630                    .pages
631                    .iter()
632                    .map(|page| {
633                        let num_rows = page.length;
634                        let encoding = match file_version {
635                            LanceFileVersion::V2_0 => {
636                                PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
637                                    page.encoding.as_ref().unwrap(),
638                                ))
639                            }
640                            _ => {
641                                PageEncoding::Structural(Self::fetch_encoding::<pbenc::PageLayout>(
642                                    page.encoding.as_ref().unwrap(),
643                                ))
644                            }
645                        };
646                        let buffer_offsets_and_sizes = Arc::from(
647                            page.buffer_offsets
648                                .iter()
649                                .zip(page.buffer_sizes.iter())
650                                .map(|(offset, size)| {
651                                    // Starting with version 2.1 we can assert that page buffers are aligned
652                                    assert!(
653                                        file_version < LanceFileVersion::V2_1
654                                            || offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
655                                    );
656                                    (*offset, *size)
657                                })
658                                .collect::<Vec<_>>(),
659                        );
660                        PageInfo {
661                            buffer_offsets_and_sizes,
662                            encoding,
663                            num_rows,
664                            priority: page.priority,
665                        }
666                    })
667                    .collect::<Vec<_>>();
668                let buffer_offsets_and_sizes = Arc::from(
669                    col_meta
670                        .buffer_offsets
671                        .iter()
672                        .zip(col_meta.buffer_sizes.iter())
673                        .map(|(offset, size)| (*offset, *size))
674                        .collect::<Vec<_>>(),
675                );
676                Arc::new(ColumnInfo {
677                    index: col_idx as u32,
678                    page_infos: Arc::from(page_infos),
679                    buffer_offsets_and_sizes,
680                    encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
681                })
682            })
683            .collect::<Vec<_>>()
684    }
685
686    fn validate_projection(
687        projection: &ReaderProjection,
688        metadata: &CachedFileMetadata,
689    ) -> Result<()> {
690        if projection.schema.fields.is_empty() {
691            return Err(Error::invalid_input(
692                "Attempt to read zero columns from the file, at least one column must be specified"
693                    .to_string(),
694                location!(),
695            ));
696        }
697        let mut column_indices_seen = BTreeSet::new();
698        for column_index in &projection.column_indices {
699            if !column_indices_seen.insert(*column_index) {
700                return Err(Error::invalid_input(
701                    format!(
702                        "The projection specified the column index {} more than once",
703                        column_index
704                    ),
705                    location!(),
706                ));
707            }
708            if *column_index >= metadata.column_infos.len() as u32 {
709                return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
710            }
711        }
712        Ok(())
713    }
714
715    /// Opens a new file reader without any pre-existing knowledge
716    ///
717    /// This will read the file schema from the file itself and thus requires a bit more I/O
718    ///
719    /// A `base_projection` can also be provided.  If provided, then the projection will apply
720    /// to all reads from the file that do not specify their own projection.
721    pub async fn try_open(
722        scheduler: FileScheduler,
723        base_projection: Option<ReaderProjection>,
724        decoder_plugins: Arc<DecoderPlugins>,
725        cache: &FileMetadataCache,
726        options: FileReaderOptions,
727    ) -> Result<Self> {
728        let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
729        let path = scheduler.reader().path().clone();
730        Self::try_open_with_file_metadata(
731            Arc::new(LanceEncodingsIo(scheduler)),
732            path,
733            base_projection,
734            decoder_plugins,
735            file_metadata,
736            cache,
737            options,
738        )
739        .await
740    }
741
742    /// Same as `try_open` but with the file metadata already loaded.
743    ///
744    /// This method also can accept any kind of `EncodingsIo` implementation allowing
745    /// for custom strategies to be used for I/O scheduling (e.g. for takes on fast
746    /// disks it may be better to avoid asynchronous overhead).
747    pub async fn try_open_with_file_metadata(
748        scheduler: Arc<dyn EncodingsIo>,
749        path: Path,
750        base_projection: Option<ReaderProjection>,
751        decoder_plugins: Arc<DecoderPlugins>,
752        file_metadata: Arc<CachedFileMetadata>,
753        cache: &FileMetadataCache,
754        options: FileReaderOptions,
755    ) -> Result<Self> {
756        let cache = Arc::new(cache.with_base_path(path));
757
758        if let Some(base_projection) = base_projection.as_ref() {
759            Self::validate_projection(base_projection, &file_metadata)?;
760        }
761        let num_rows = file_metadata.num_rows;
762        Ok(Self {
763            scheduler,
764            base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
765                file_metadata.file_schema.as_ref(),
766                file_metadata.version(),
767            )),
768            num_rows,
769            metadata: file_metadata,
770            decoder_plugins,
771            cache,
772            options,
773        })
774    }
775
776    // The actual decoder needs all the column infos that make up a type.  In other words, if
777    // the first type in the schema is Struct<i32, i32> then the decoder will need 3 column infos.
778    //
779    // This is a file reader concern because the file reader needs to support late projection of columns
780    // and so it will need to figure this out anyways.
781    //
782    // It's a bit of a tricky process though because the number of column infos may depend on the
783    // encoding.  Considering the above example, if we wrote it with a packed encoding, then there would
784    // only be a single column in the file (and not 3).
785    //
786    // At the moment this method words because our rules are simple and we just repeat them here.  See
787    // Self::default_projection for a similar problem.  In the future this is something the encodings
788    // registry will need to figure out.
789    fn collect_columns_from_projection(
790        &self,
791        _projection: &ReaderProjection,
792    ) -> Result<Vec<Arc<ColumnInfo>>> {
793        Ok(self.metadata.column_infos.to_vec())
794    }
795
796    #[allow(clippy::too_many_arguments)]
797    fn do_read_range(
798        column_infos: Vec<Arc<ColumnInfo>>,
799        io: Arc<dyn EncodingsIo>,
800        cache: Arc<FileMetadataCache>,
801        num_rows: u64,
802        decoder_plugins: Arc<DecoderPlugins>,
803        range: Range<u64>,
804        batch_size: u32,
805        projection: ReaderProjection,
806        filter: FilterExpression,
807        should_validate: bool,
808    ) -> Result<BoxStream<'static, ReadBatchTask>> {
809        debug!(
810            "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
811            range,
812            batch_size,
813            num_rows,
814            column_infos.len(),
815            projection.schema.fields.len(),
816        );
817
818        let config = SchedulerDecoderConfig {
819            batch_size,
820            cache,
821            decoder_plugins,
822            io,
823            should_validate,
824        };
825
826        let requested_rows = RequestedRows::Ranges(vec![range]);
827
828        Ok(schedule_and_decode(
829            column_infos,
830            requested_rows,
831            filter,
832            projection.column_indices,
833            projection.schema,
834            config,
835        ))
836    }
837
838    fn read_range(
839        &self,
840        range: Range<u64>,
841        batch_size: u32,
842        projection: ReaderProjection,
843        filter: FilterExpression,
844    ) -> Result<BoxStream<'static, ReadBatchTask>> {
845        // Create and initialize the stream
846        Self::do_read_range(
847            self.collect_columns_from_projection(&projection)?,
848            self.scheduler.clone(),
849            self.cache.clone(),
850            self.num_rows,
851            self.decoder_plugins.clone(),
852            range,
853            batch_size,
854            projection,
855            filter,
856            self.options.validate_on_decode,
857        )
858    }
859
860    #[allow(clippy::too_many_arguments)]
861    fn do_take_rows(
862        column_infos: Vec<Arc<ColumnInfo>>,
863        io: Arc<dyn EncodingsIo>,
864        cache: Arc<FileMetadataCache>,
865        decoder_plugins: Arc<DecoderPlugins>,
866        indices: Vec<u64>,
867        batch_size: u32,
868        projection: ReaderProjection,
869        filter: FilterExpression,
870        should_validate: bool,
871    ) -> Result<BoxStream<'static, ReadBatchTask>> {
872        debug!(
873            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
874            indices.len(),
875            indices[0],
876            indices[indices.len() - 1],
877            batch_size,
878            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
879        );
880
881        let config = SchedulerDecoderConfig {
882            batch_size,
883            cache,
884            decoder_plugins,
885            io,
886            should_validate,
887        };
888
889        let requested_rows = RequestedRows::Indices(indices);
890
891        Ok(schedule_and_decode(
892            column_infos,
893            requested_rows,
894            filter,
895            projection.column_indices,
896            projection.schema,
897            config,
898        ))
899    }
900
901    fn take_rows(
902        &self,
903        indices: Vec<u64>,
904        batch_size: u32,
905        projection: ReaderProjection,
906    ) -> Result<BoxStream<'static, ReadBatchTask>> {
907        // Create and initialize the stream
908        Self::do_take_rows(
909            self.collect_columns_from_projection(&projection)?,
910            self.scheduler.clone(),
911            self.cache.clone(),
912            self.decoder_plugins.clone(),
913            indices,
914            batch_size,
915            projection,
916            FilterExpression::no_filter(),
917            self.options.validate_on_decode,
918        )
919    }
920
921    /// Creates a stream of "read tasks" to read the data from the file
922    ///
923    /// The arguments are similar to [`Self::read_stream_projected`] but instead of returning a stream
924    /// of record batches it returns a stream of "read tasks".
925    ///
926    /// The tasks should be consumed with some kind of `buffered` argument if CPU parallelism is desired.
927    ///
928    /// Note that "read task" is probably a bit imprecise.  The tasks are actually "decode tasks".  The
929    /// reading happens asynchronously in the background.  In other words, a single read task may map to
930    /// multiple I/O operations or a single I/O operation may map to multiple read tasks.
931    pub fn read_tasks(
932        &self,
933        params: ReadBatchParams,
934        batch_size: u32,
935        projection: Option<ReaderProjection>,
936        filter: FilterExpression,
937    ) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
938        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
939        Self::validate_projection(&projection, &self.metadata)?;
940        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
941            if bound > self.num_rows || bound == self.num_rows && inclusive {
942                Err(Error::invalid_input(
943                    format!(
944                        "cannot read {:?} from file with {} rows",
945                        params, self.num_rows
946                    ),
947                    location!(),
948                ))
949            } else {
950                Ok(())
951            }
952        };
953        match &params {
954            ReadBatchParams::Indices(indices) => {
955                for idx in indices {
956                    match idx {
957                        None => {
958                            return Err(Error::invalid_input(
959                                "Null value in indices array",
960                                location!(),
961                            ));
962                        }
963                        Some(idx) => {
964                            verify_bound(&params, idx as u64, true)?;
965                        }
966                    }
967                }
968                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
969                self.take_rows(indices, batch_size, projection)
970            }
971            ReadBatchParams::Range(range) => {
972                verify_bound(&params, range.end as u64, false)?;
973                self.read_range(
974                    range.start as u64..range.end as u64,
975                    batch_size,
976                    projection,
977                    filter,
978                )
979            }
980            ReadBatchParams::RangeFrom(range) => {
981                verify_bound(&params, range.start as u64, true)?;
982                self.read_range(
983                    range.start as u64..self.num_rows,
984                    batch_size,
985                    projection,
986                    filter,
987                )
988            }
989            ReadBatchParams::RangeTo(range) => {
990                verify_bound(&params, range.end as u64, false)?;
991                self.read_range(0..range.end as u64, batch_size, projection, filter)
992            }
993            ReadBatchParams::RangeFull => {
994                self.read_range(0..self.num_rows, batch_size, projection, filter)
995            }
996        }
997    }
998
999    /// Reads data from the file as a stream of record batches
1000    ///
1001    /// * `params` - Specifies the range (or indices) of data to read
1002    /// * `batch_size` - The maximum size of a single batch.  A batch may be smaller
1003    ///   if it is the last batch or if it is not possible to create a batch of the
1004    ///   requested size.
1005    ///
1006    ///   For example, if the batch size is 1024 and one of the columns is a string
1007    ///   column then there may be some ranges of 1024 rows that contain more than
1008    ///   2^31 bytes of string data (which is the maximum size of a string column
1009    ///   in Arrow).  In this case smaller batches may be emitted.
1010    /// * `batch_readahead` - The number of batches to read ahead.  This controls the
1011    ///   amount of CPU parallelism of the read.  In other words it controls how many
1012    ///   batches will be decoded in parallel.  It has no effect on the I/O parallelism
1013    ///   of the read (how many I/O requests are in flight at once).
1014    ///
1015    ///   This parameter also is also related to backpressure.  If the consumer of the
1016    ///   stream is slow then the reader will build up RAM.
1017    /// * `projection` - A projection to apply to the read.  This controls which columns
1018    ///   are read from the file.  The projection is NOT applied on top of the base
1019    ///   projection.  The projection is applied directly to the file schema.
1020    pub fn read_stream_projected(
1021        &self,
1022        params: ReadBatchParams,
1023        batch_size: u32,
1024        batch_readahead: u32,
1025        projection: ReaderProjection,
1026        filter: FilterExpression,
1027    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1028        let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1029        let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
1030        let batch_stream = tasks_stream
1031            .map(|task| task.task)
1032            .buffered(batch_readahead as usize)
1033            .boxed();
1034        Ok(Box::pin(RecordBatchStreamAdapter::new(
1035            arrow_schema,
1036            batch_stream,
1037        )))
1038    }
1039
1040    fn take_rows_blocking(
1041        &self,
1042        indices: Vec<u64>,
1043        batch_size: u32,
1044        projection: ReaderProjection,
1045        filter: FilterExpression,
1046    ) -> Result<Box<dyn RecordBatchReader>> {
1047        let column_infos = self.collect_columns_from_projection(&projection)?;
1048        debug!(
1049            "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
1050            indices.len(),
1051            indices[0],
1052            indices[indices.len() - 1],
1053            batch_size,
1054            column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
1055        );
1056
1057        let config = SchedulerDecoderConfig {
1058            batch_size,
1059            cache: self.cache.clone(),
1060            decoder_plugins: self.decoder_plugins.clone(),
1061            io: self.scheduler.clone(),
1062            should_validate: self.options.validate_on_decode,
1063        };
1064
1065        let requested_rows = RequestedRows::Indices(indices);
1066
1067        schedule_and_decode_blocking(
1068            column_infos,
1069            requested_rows,
1070            filter,
1071            projection.column_indices,
1072            projection.schema,
1073            config,
1074        )
1075    }
1076
1077    /// Read data from the file as an iterator of record batches
1078    ///
1079    /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the
1080    /// calling thread.  It will block on I/O if the decode is faster than the I/O.  It is useful
1081    /// for benchmarking and potentially from "take"ing small batches from fast disks.
1082    ///
1083    /// Large scans of in-memory data will still benefit from threading (and should therefore not
1084    /// use this method) because we can parallelize the decode.
1085    ///
1086    /// Note: calling this from within a tokio runtime will panic.  It is acceptable to call this
1087    /// from a spawn_blocking context.
1088    pub fn read_stream_projected_blocking(
1089        &self,
1090        params: ReadBatchParams,
1091        batch_size: u32,
1092        projection: Option<ReaderProjection>,
1093        filter: FilterExpression,
1094    ) -> Result<Box<dyn RecordBatchReader>> {
1095        let projection = projection.unwrap_or_else(|| self.base_projection.clone());
1096        Self::validate_projection(&projection, &self.metadata)?;
1097        let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
1098            if bound > self.num_rows || bound == self.num_rows && inclusive {
1099                Err(Error::invalid_input(
1100                    format!(
1101                        "cannot read {:?} from file with {} rows",
1102                        params, self.num_rows
1103                    ),
1104                    location!(),
1105                ))
1106            } else {
1107                Ok(())
1108            }
1109        };
1110        match &params {
1111            ReadBatchParams::Indices(indices) => {
1112                for idx in indices {
1113                    match idx {
1114                        None => {
1115                            return Err(Error::invalid_input(
1116                                "Null value in indices array",
1117                                location!(),
1118                            ));
1119                        }
1120                        Some(idx) => {
1121                            verify_bound(&params, idx as u64, true)?;
1122                        }
1123                    }
1124                }
1125                let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
1126                self.take_rows_blocking(indices, batch_size, projection, filter)
1127            }
1128            _ => todo!(),
1129        }
1130    }
1131
1132    /// Reads data from the file as a stream of record batches
1133    ///
1134    /// This is similar to [`Self::read_stream_projected`] but uses the base projection
1135    /// provided when the file was opened (or reads all columns if the file was
1136    /// opened without a base projection)
1137    pub fn read_stream(
1138        &self,
1139        params: ReadBatchParams,
1140        batch_size: u32,
1141        batch_readahead: u32,
1142        filter: FilterExpression,
1143    ) -> Result<Pin<Box<dyn RecordBatchStream>>> {
1144        self.read_stream_projected(
1145            params,
1146            batch_size,
1147            batch_readahead,
1148            self.base_projection.clone(),
1149            filter,
1150        )
1151    }
1152
1153    pub fn schema(&self) -> &Arc<Schema> {
1154        &self.metadata.file_schema
1155    }
1156}
1157
1158/// Inspects a page and returns a String describing the page's encoding
1159pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
1160    if let Some(encoding) = &page.encoding {
1161        if let Some(style) = &encoding.location {
1162            match style {
1163                pbfile::encoding::Location::Indirect(indirect) => {
1164                    format!(
1165                        "IndirectEncoding(pos={},size={})",
1166                        indirect.buffer_location, indirect.buffer_length
1167                    )
1168                }
1169                pbfile::encoding::Location::Direct(direct) => {
1170                    let encoding_any =
1171                        prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
1172                            .expect("failed to deserialize encoding as protobuf");
1173                    if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
1174                        let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
1175                        match encoding {
1176                            Ok(encoding) => {
1177                                format!("{:#?}", encoding)
1178                            }
1179                            Err(err) => {
1180                                format!("Unsupported(decode_err={})", err)
1181                            }
1182                        }
1183                    } else if encoding_any.type_url == "/lance.encodings.PageLayout" {
1184                        let encoding = encoding_any.to_msg::<pbenc::PageLayout>();
1185                        match encoding {
1186                            Ok(encoding) => {
1187                                format!("{:#?}", encoding)
1188                            }
1189                            Err(err) => {
1190                                format!("Unsupported(decode_err={})", err)
1191                            }
1192                        }
1193                    } else {
1194                        format!("Unrecognized(type_url={})", encoding_any.type_url)
1195                    }
1196                }
1197                pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
1198            }
1199        } else {
1200            "MISSING STYLE".to_string()
1201        }
1202    } else {
1203        "MISSING".to_string()
1204    }
1205}
1206
1207pub trait EncodedBatchReaderExt {
1208    fn try_from_mini_lance(
1209        bytes: Bytes,
1210        schema: &Schema,
1211        version: LanceFileVersion,
1212    ) -> Result<Self>
1213    where
1214        Self: Sized;
1215    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1216    where
1217        Self: Sized;
1218}
1219
1220impl EncodedBatchReaderExt for EncodedBatch {
1221    fn try_from_mini_lance(
1222        bytes: Bytes,
1223        schema: &Schema,
1224        file_version: LanceFileVersion,
1225    ) -> Result<Self>
1226    where
1227        Self: Sized,
1228    {
1229        let projection = ReaderProjection::from_whole_schema(schema, file_version);
1230        let footer = FileReader::decode_footer(&bytes)?;
1231
1232        // Next, read the metadata for the columns
1233        // This is both the column metadata and the CMO table
1234        let column_metadata_start = footer.column_meta_start as usize;
1235        let column_metadata_end = footer.global_buff_offsets_start as usize;
1236        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1237        let column_metadatas =
1238            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1239
1240        let file_version = LanceFileVersion::try_from_major_minor(
1241            footer.major_version as u32,
1242            footer.minor_version as u32,
1243        )?;
1244
1245        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1246
1247        Ok(Self {
1248            data: bytes,
1249            num_rows: page_table
1250                .first()
1251                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1252                .unwrap_or(0),
1253            page_table,
1254            top_level_columns: projection.column_indices,
1255            schema: Arc::new(schema.clone()),
1256        })
1257    }
1258
1259    fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
1260    where
1261        Self: Sized,
1262    {
1263        let footer = FileReader::decode_footer(&bytes)?;
1264        let file_version = LanceFileVersion::try_from_major_minor(
1265            footer.major_version as u32,
1266            footer.minor_version as u32,
1267        )?;
1268
1269        let gbo_table = FileReader::do_decode_gbo_table(
1270            &bytes.slice(footer.global_buff_offsets_start as usize..),
1271            &footer,
1272            file_version,
1273        )?;
1274        if gbo_table.is_empty() {
1275            return Err(Error::Internal {
1276                message: "File did not contain any global buffers, schema expected".to_string(),
1277                location: location!(),
1278            });
1279        }
1280        let schema_start = gbo_table[0].position as usize;
1281        let schema_size = gbo_table[0].size as usize;
1282
1283        let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
1284        let (_, schema) = FileReader::decode_schema(schema_bytes)?;
1285        let projection = ReaderProjection::from_whole_schema(&schema, file_version);
1286
1287        // Next, read the metadata for the columns
1288        // This is both the column metadata and the CMO table
1289        let column_metadata_start = footer.column_meta_start as usize;
1290        let column_metadata_end = footer.global_buff_offsets_start as usize;
1291        let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
1292        let column_metadatas =
1293            FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
1294
1295        let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
1296
1297        Ok(Self {
1298            data: bytes,
1299            num_rows: page_table
1300                .first()
1301                .map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
1302                .unwrap_or(0),
1303            page_table,
1304            top_level_columns: projection.column_indices,
1305            schema: Arc::new(schema),
1306        })
1307    }
1308}
1309
1310#[cfg(test)]
1311pub mod tests {
1312    use std::{collections::BTreeMap, pin::Pin, sync::Arc};
1313
1314    use arrow_array::{
1315        types::{Float64Type, Int32Type},
1316        RecordBatch, UInt32Array,
1317    };
1318    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
1319    use bytes::Bytes;
1320    use futures::{prelude::stream::TryStreamExt, StreamExt};
1321    use lance_arrow::RecordBatchExt;
1322    use lance_core::{datatypes::Schema, ArrowResult};
1323    use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
1324    use lance_encoding::{
1325        decoder::{decode_batch, DecodeBatchScheduler, DecoderPlugins, FilterExpression},
1326        encoder::{encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodingOptions},
1327        version::LanceFileVersion,
1328    };
1329    use lance_io::stream::RecordBatchStream;
1330    use log::debug;
1331    use tokio::sync::mpsc;
1332
1333    use crate::v2::{
1334        reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection},
1335        testing::{test_cache, write_lance_file, FsFixture, WrittenFile},
1336        writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
1337    };
1338
1339    async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
1340        let location_type = DataType::Struct(Fields::from(vec![
1341            Field::new("x", DataType::Float64, true),
1342            Field::new("y", DataType::Float64, true),
1343        ]));
1344        let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
1345
1346        let mut reader = gen()
1347            .col("score", array::rand::<Float64Type>())
1348            .col("location", array::rand_type(&location_type))
1349            .col("categories", array::rand_type(&categories_type))
1350            .col("binary", array::rand_type(&DataType::Binary));
1351        if version <= LanceFileVersion::V2_0 {
1352            reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
1353        }
1354        let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
1355
1356        write_lance_file(
1357            reader,
1358            fs,
1359            FileWriterOptions {
1360                format_version: Some(version),
1361                ..Default::default()
1362            },
1363        )
1364        .await
1365    }
1366
1367    type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
1368
1369    async fn verify_expected(
1370        expected: &[RecordBatch],
1371        mut actual: Pin<Box<dyn RecordBatchStream>>,
1372        read_size: u32,
1373        transform: Option<Transformer>,
1374    ) {
1375        let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
1376        let mut expected_iter = expected.iter().map(|batch| {
1377            if let Some(transform) = &transform {
1378                transform(batch)
1379            } else {
1380                batch.clone()
1381            }
1382        });
1383        let mut next_expected = expected_iter.next().unwrap().clone();
1384        while let Some(actual) = actual.next().await {
1385            let mut actual = actual.unwrap();
1386            let mut rows_to_verify = actual.num_rows() as u32;
1387            let expected_length = remaining.min(read_size);
1388            assert_eq!(expected_length, rows_to_verify);
1389
1390            while rows_to_verify > 0 {
1391                let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
1392                assert_eq!(
1393                    next_expected.slice(0, next_slice_len as usize),
1394                    actual.slice(0, next_slice_len as usize)
1395                );
1396                remaining -= next_slice_len;
1397                rows_to_verify -= next_slice_len;
1398                if remaining > 0 {
1399                    if next_slice_len == next_expected.num_rows() as u32 {
1400                        next_expected = expected_iter.next().unwrap().clone();
1401                    } else {
1402                        next_expected = next_expected.slice(
1403                            next_slice_len as usize,
1404                            next_expected.num_rows() - next_slice_len as usize,
1405                        );
1406                    }
1407                }
1408                if rows_to_verify > 0 {
1409                    actual = actual.slice(
1410                        next_slice_len as usize,
1411                        actual.num_rows() - next_slice_len as usize,
1412                    );
1413                }
1414            }
1415        }
1416        assert_eq!(remaining, 0);
1417    }
1418
1419    #[tokio::test]
1420    async fn test_round_trip() {
1421        let fs = FsFixture::default();
1422
1423        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1424
1425        for read_size in [32, 1024, 1024 * 1024] {
1426            let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1427            let file_reader = FileReader::try_open(
1428                file_scheduler,
1429                None,
1430                Arc::<DecoderPlugins>::default(),
1431                &test_cache(),
1432                FileReaderOptions::default(),
1433            )
1434            .await
1435            .unwrap();
1436
1437            let schema = file_reader.schema();
1438            assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
1439
1440            let batch_stream = file_reader
1441                .read_stream(
1442                    lance_io::ReadBatchParams::RangeFull,
1443                    read_size,
1444                    16,
1445                    FilterExpression::no_filter(),
1446                )
1447                .unwrap();
1448
1449            verify_expected(&data, batch_stream, read_size, None).await;
1450        }
1451    }
1452
1453    #[test_log::test(tokio::test)]
1454    async fn test_encoded_batch_round_trip() {
1455        let data = gen()
1456            .col("x", array::rand::<Int32Type>())
1457            .col("y", array::rand_utf8(ByteCount::from(16), false))
1458            .into_batch_rows(RowCount::from(10000))
1459            .unwrap();
1460
1461        let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
1462
1463        let encoding_options = EncodingOptions {
1464            cache_bytes_per_column: 4096,
1465            max_page_bytes: 32 * 1024 * 1024,
1466            keep_original_array: true,
1467            buffer_alignment: 64,
1468        };
1469        let encoded_batch = encode_batch(
1470            &data,
1471            lance_schema.clone(),
1472            &CoreFieldEncodingStrategy::default(),
1473            &encoding_options,
1474        )
1475        .await
1476        .unwrap();
1477
1478        // Test self described
1479        let bytes = encoded_batch.try_to_self_described_lance().unwrap();
1480
1481        let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
1482
1483        let decoded = decode_batch(
1484            &decoded_batch,
1485            &FilterExpression::no_filter(),
1486            Arc::<DecoderPlugins>::default(),
1487            false,
1488            LanceFileVersion::default(),
1489            None,
1490        )
1491        .await
1492        .unwrap();
1493
1494        assert_eq!(data, decoded);
1495
1496        // Test mini
1497        let bytes = encoded_batch.try_to_mini_lance().unwrap();
1498        let decoded_batch =
1499            EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
1500                .unwrap();
1501        let decoded = decode_batch(
1502            &decoded_batch,
1503            &FilterExpression::no_filter(),
1504            Arc::<DecoderPlugins>::default(),
1505            false,
1506            LanceFileVersion::default(),
1507            None,
1508        )
1509        .await
1510        .unwrap();
1511
1512        assert_eq!(data, decoded);
1513    }
1514
1515    #[test_log::test(tokio::test)]
1516    async fn test_projection() {
1517        let fs = FsFixture::default();
1518
1519        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1520        let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1521
1522        let field_id_mapping = written_file
1523            .field_id_mapping
1524            .iter()
1525            .copied()
1526            .collect::<BTreeMap<_, _>>();
1527
1528        let empty_projection = ReaderProjection {
1529            column_indices: Vec::default(),
1530            schema: Arc::new(Schema::default()),
1531        };
1532
1533        for columns in [
1534            vec!["score"],
1535            vec!["location"],
1536            vec!["categories"],
1537            vec!["score.x"],
1538            vec!["score", "categories"],
1539            vec!["score", "location"],
1540            vec!["location", "categories"],
1541            vec!["score.y", "location", "categories"],
1542        ] {
1543            debug!("Testing round trip with projection {:?}", columns);
1544            // We can specify the projection as part of the read operation via read_stream_projected
1545            let file_reader = FileReader::try_open(
1546                file_scheduler.clone(),
1547                None,
1548                Arc::<DecoderPlugins>::default(),
1549                &test_cache(),
1550                FileReaderOptions::default(),
1551            )
1552            .await
1553            .unwrap();
1554
1555            let projected_schema = written_file.schema.project(&columns).unwrap();
1556            let projection = ReaderProjection::from_field_ids(
1557                &file_reader,
1558                &projected_schema,
1559                &field_id_mapping,
1560            )
1561            .unwrap();
1562
1563            let batch_stream = file_reader
1564                .read_stream_projected(
1565                    lance_io::ReadBatchParams::RangeFull,
1566                    1024,
1567                    16,
1568                    projection.clone(),
1569                    FilterExpression::no_filter(),
1570                )
1571                .unwrap();
1572
1573            let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1574            verify_expected(
1575                &written_file.data,
1576                batch_stream,
1577                1024,
1578                Some(Box::new(move |batch: &RecordBatch| {
1579                    batch.project_by_schema(&projection_arrow).unwrap()
1580                })),
1581            )
1582            .await;
1583
1584            // We can also specify the projection as a base projection when we open the file
1585            let file_reader = FileReader::try_open(
1586                file_scheduler.clone(),
1587                Some(projection.clone()),
1588                Arc::<DecoderPlugins>::default(),
1589                &test_cache(),
1590                FileReaderOptions::default(),
1591            )
1592            .await
1593            .unwrap();
1594
1595            let batch_stream = file_reader
1596                .read_stream(
1597                    lance_io::ReadBatchParams::RangeFull,
1598                    1024,
1599                    16,
1600                    FilterExpression::no_filter(),
1601                )
1602                .unwrap();
1603
1604            let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
1605            verify_expected(
1606                &written_file.data,
1607                batch_stream,
1608                1024,
1609                Some(Box::new(move |batch: &RecordBatch| {
1610                    batch.project_by_schema(&projection_arrow).unwrap()
1611                })),
1612            )
1613            .await;
1614
1615            assert!(file_reader
1616                .read_stream_projected(
1617                    lance_io::ReadBatchParams::RangeFull,
1618                    1024,
1619                    16,
1620                    empty_projection.clone(),
1621                    FilterExpression::no_filter(),
1622                )
1623                .is_err());
1624        }
1625
1626        assert!(FileReader::try_open(
1627            file_scheduler.clone(),
1628            Some(empty_projection),
1629            Arc::<DecoderPlugins>::default(),
1630            &test_cache(),
1631            FileReaderOptions::default(),
1632        )
1633        .await
1634        .is_err());
1635
1636        let arrow_schema = ArrowSchema::new(vec![
1637            Field::new("x", DataType::Int32, true),
1638            Field::new("y", DataType::Int32, true),
1639        ]);
1640        let schema = Schema::try_from(&arrow_schema).unwrap();
1641
1642        let projection_with_dupes = ReaderProjection {
1643            column_indices: vec![0, 0],
1644            schema: Arc::new(schema),
1645        };
1646
1647        assert!(FileReader::try_open(
1648            file_scheduler.clone(),
1649            Some(projection_with_dupes),
1650            Arc::<DecoderPlugins>::default(),
1651            &test_cache(),
1652            FileReaderOptions::default(),
1653        )
1654        .await
1655        .is_err());
1656    }
1657
1658    #[test_log::test(tokio::test)]
1659    async fn test_compressing_buffer() {
1660        let fs = FsFixture::default();
1661
1662        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1663        let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1664
1665        // We can specify the projection as part of the read operation via read_stream_projected
1666        let file_reader = FileReader::try_open(
1667            file_scheduler.clone(),
1668            None,
1669            Arc::<DecoderPlugins>::default(),
1670            &test_cache(),
1671            FileReaderOptions::default(),
1672        )
1673        .await
1674        .unwrap();
1675
1676        let mut projection = written_file.schema.project(&["score"]).unwrap();
1677        for field in projection.fields.iter_mut() {
1678            field
1679                .metadata
1680                .insert("lance:compression".to_string(), "zstd".to_string());
1681        }
1682        let projection = ReaderProjection {
1683            column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
1684            schema: Arc::new(projection),
1685        };
1686
1687        let batch_stream = file_reader
1688            .read_stream_projected(
1689                lance_io::ReadBatchParams::RangeFull,
1690                1024,
1691                16,
1692                projection.clone(),
1693                FilterExpression::no_filter(),
1694            )
1695            .unwrap();
1696
1697        let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
1698        verify_expected(
1699            &written_file.data,
1700            batch_stream,
1701            1024,
1702            Some(Box::new(move |batch: &RecordBatch| {
1703                batch.project_by_schema(&projection_arrow).unwrap()
1704            })),
1705        )
1706        .await;
1707    }
1708
1709    #[tokio::test]
1710    async fn test_read_all() {
1711        let fs = FsFixture::default();
1712        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1713        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1714
1715        let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1716        let file_reader = FileReader::try_open(
1717            file_scheduler.clone(),
1718            None,
1719            Arc::<DecoderPlugins>::default(),
1720            &test_cache(),
1721            FileReaderOptions::default(),
1722        )
1723        .await
1724        .unwrap();
1725
1726        let batches = file_reader
1727            .read_stream(
1728                lance_io::ReadBatchParams::RangeFull,
1729                total_rows as u32,
1730                16,
1731                FilterExpression::no_filter(),
1732            )
1733            .unwrap()
1734            .try_collect::<Vec<_>>()
1735            .await
1736            .unwrap();
1737        assert_eq!(batches.len(), 1);
1738        assert_eq!(batches[0].num_rows(), total_rows);
1739    }
1740
1741    #[tokio::test]
1742    async fn test_blocking_take() {
1743        let fs = FsFixture::default();
1744        let WrittenFile { data, schema, .. } = create_some_file(&fs, LanceFileVersion::V2_1).await;
1745        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1746
1747        let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1748        let file_reader = FileReader::try_open(
1749            file_scheduler.clone(),
1750            Some(ReaderProjection::from_column_names(&schema, &["score"]).unwrap()),
1751            Arc::<DecoderPlugins>::default(),
1752            &test_cache(),
1753            FileReaderOptions::default(),
1754        )
1755        .await
1756        .unwrap();
1757
1758        let batches = tokio::task::spawn_blocking(move || {
1759            file_reader
1760                .read_stream_projected_blocking(
1761                    lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
1762                    total_rows as u32,
1763                    None,
1764                    FilterExpression::no_filter(),
1765                )
1766                .unwrap()
1767                .collect::<ArrowResult<Vec<_>>>()
1768                .unwrap()
1769        })
1770        .await
1771        .unwrap();
1772
1773        assert_eq!(batches.len(), 1);
1774        assert_eq!(batches[0].num_rows(), 5);
1775        assert_eq!(batches[0].num_columns(), 1);
1776    }
1777
1778    #[tokio::test(flavor = "multi_thread")]
1779    async fn test_drop_in_progress() {
1780        let fs = FsFixture::default();
1781        let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
1782        let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
1783
1784        let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1785        let file_reader = FileReader::try_open(
1786            file_scheduler.clone(),
1787            None,
1788            Arc::<DecoderPlugins>::default(),
1789            &test_cache(),
1790            FileReaderOptions::default(),
1791        )
1792        .await
1793        .unwrap();
1794
1795        let mut batches = file_reader
1796            .read_stream(
1797                lance_io::ReadBatchParams::RangeFull,
1798                (total_rows / 10) as u32,
1799                16,
1800                FilterExpression::no_filter(),
1801            )
1802            .unwrap();
1803
1804        drop(file_reader);
1805
1806        let batch = batches.next().await.unwrap().unwrap();
1807        assert!(batch.num_rows() > 0);
1808
1809        // Drop in-progress scan
1810        drop(batches);
1811    }
1812
1813    #[tokio::test]
1814    async fn drop_while_scheduling() {
1815        // This is a bit of a white-box test, pokes at the internals.  We want to
1816        // test the case where the read stream is dropped before the scheduling
1817        // thread finishes.  We can't do that in a black-box fashion because the
1818        // scheduling thread runs in the background and there is no easy way to
1819        // pause / gate it.
1820
1821        // It's a regression for a bug where the scheduling thread would panic
1822        // if the stream was dropped before it finished.
1823
1824        let fs = FsFixture::default();
1825        let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
1826        let total_rows = written_file
1827            .data
1828            .iter()
1829            .map(|batch| batch.num_rows())
1830            .sum::<usize>();
1831
1832        let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1833        let file_reader = FileReader::try_open(
1834            file_scheduler.clone(),
1835            None,
1836            Arc::<DecoderPlugins>::default(),
1837            &test_cache(),
1838            FileReaderOptions::default(),
1839        )
1840        .await
1841        .unwrap();
1842
1843        let projection =
1844            ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
1845        let column_infos = file_reader
1846            .collect_columns_from_projection(&projection)
1847            .unwrap();
1848        let mut decode_scheduler = DecodeBatchScheduler::try_new(
1849            &projection.schema,
1850            &projection.column_indices,
1851            &column_infos,
1852            &vec![],
1853            total_rows as u64,
1854            Arc::<DecoderPlugins>::default(),
1855            file_reader.scheduler.clone(),
1856            test_cache(),
1857            &FilterExpression::no_filter(),
1858        )
1859        .await
1860        .unwrap();
1861
1862        let range = 0..total_rows as u64;
1863
1864        let (tx, rx) = mpsc::unbounded_channel();
1865
1866        // Simulate the stream / decoder being dropped
1867        drop(rx);
1868
1869        // Scheduling should not panic
1870        decode_scheduler.schedule_range(
1871            range,
1872            &FilterExpression::no_filter(),
1873            tx,
1874            file_reader.scheduler.clone(),
1875        )
1876    }
1877
1878    #[tokio::test]
1879    async fn test_global_buffers() {
1880        let fs = FsFixture::default();
1881
1882        let lance_schema =
1883            lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
1884                "foo",
1885                DataType::Int32,
1886                true,
1887            )]))
1888            .unwrap();
1889
1890        let mut file_writer = FileWriter::try_new(
1891            fs.object_store.create(&fs.tmp_path).await.unwrap(),
1892            lance_schema.clone(),
1893            FileWriterOptions::default(),
1894        )
1895        .unwrap();
1896
1897        let test_bytes = Bytes::from_static(b"hello");
1898
1899        let buf_index = file_writer
1900            .add_global_buffer(test_bytes.clone())
1901            .await
1902            .unwrap();
1903
1904        assert_eq!(buf_index, 1);
1905
1906        file_writer.finish().await.unwrap();
1907
1908        let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
1909        let file_reader = FileReader::try_open(
1910            file_scheduler.clone(),
1911            None,
1912            Arc::<DecoderPlugins>::default(),
1913            &test_cache(),
1914            FileReaderOptions::default(),
1915        )
1916        .await
1917        .unwrap();
1918
1919        let buf = file_reader.read_global_buffer(1).await.unwrap();
1920        assert_eq!(buf, test_bytes);
1921    }
1922}