lance_file/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Data File Reader
5
6// Standard
7use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12    builder::PrimitiveBuilder,
13    cast::AsArray,
14    types::{Int32Type, Int64Type},
15    ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16    RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::FileMetadataCache;
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33    read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36
37use object_store::path::Path;
38use snafu::location;
39use tracing::instrument;
40
41use crate::format::metadata::Metadata;
42use crate::page_table::{PageInfo, PageTable};
43
44/// Lance File Reader.
45///
46/// It reads arrow data from one data file.
47#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49    pub object_reader: Arc<dyn Reader>,
50    metadata: Arc<Metadata>,
51    page_table: Arc<PageTable>,
52    schema: Schema,
53
54    /// The id of the fragment which this file belong to.
55    /// For simple file access, this can just be zero.
56    fragment_id: u64,
57
58    /// Page table for statistics
59    stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("FileReader")
65            .field("fragment", &self.fragment_id)
66            .field("path", &self.object_reader.path())
67            .finish()
68    }
69}
70
71impl FileReader {
72    /// Open file reader
73    ///
74    /// Open the file at the given path using the provided object store.
75    ///
76    /// The passed fragment ID determines the first 32-bits of the row IDs.
77    ///
78    /// If a manifest is passed in, it will be used to load the schema and dictionary.
79    /// This is typically done if the file is part of a dataset fragment. If no manifest
80    /// is passed in, then it is read from the file itself.
81    ///
82    /// The session passed in is used to cache metadata about the file. If no session
83    /// is passed in, there will be no caching.
84    #[instrument(level = "debug", skip(object_store, schema, session))]
85    pub async fn try_new_with_fragment_id(
86        object_store: &ObjectStore,
87        path: &Path,
88        schema: Schema,
89        fragment_id: u32,
90        field_id_offset: i32,
91        max_field_id: i32,
92        session: Option<&FileMetadataCache>,
93    ) -> Result<Self> {
94        let object_reader = object_store.open(path).await?;
95
96        let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
97
98        Self::try_new_from_reader(
99            path,
100            object_reader.into(),
101            Some(metadata),
102            schema,
103            fragment_id,
104            field_id_offset,
105            max_field_id,
106            session,
107        )
108        .await
109    }
110
111    #[allow(clippy::too_many_arguments)]
112    pub async fn try_new_from_reader(
113        path: &Path,
114        object_reader: Arc<dyn Reader>,
115        metadata: Option<Arc<Metadata>>,
116        schema: Schema,
117        fragment_id: u32,
118        field_id_offset: i32,
119        max_field_id: i32,
120        session: Option<&FileMetadataCache>,
121    ) -> Result<Self> {
122        let metadata = match metadata {
123            Some(metadata) => metadata,
124            None => Self::read_metadata(object_reader.as_ref(), session).await?,
125        };
126
127        let page_table = async {
128            Self::load_from_cache(session, path, |_| async {
129                PageTable::load(
130                    object_reader.as_ref(),
131                    metadata.page_table_position,
132                    field_id_offset,
133                    max_field_id,
134                    metadata.num_batches() as i32,
135                )
136                .await
137            })
138            .await
139        };
140
141        let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
142
143        // Can concurrently load page tables
144        let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
145
146        Ok(Self {
147            object_reader,
148            metadata,
149            schema,
150            page_table,
151            fragment_id: fragment_id as u64,
152            stats_page_table,
153        })
154    }
155
156    pub async fn read_metadata(
157        object_reader: &dyn Reader,
158        cache: Option<&FileMetadataCache>,
159    ) -> Result<Arc<Metadata>> {
160        Self::load_from_cache(cache, object_reader.path(), |_| async {
161            let file_size = object_reader.size().await?;
162            let begin = if file_size < object_reader.block_size() {
163                0
164            } else {
165                file_size - object_reader.block_size()
166            };
167            let tail_bytes = object_reader.get_range(begin..file_size).await?;
168            let metadata_pos = read_metadata_offset(&tail_bytes)?;
169
170            let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
171                // We have not read the metadata bytes yet.
172                read_struct(object_reader, metadata_pos).await?
173            } else {
174                let offset = tail_bytes.len() - (file_size - metadata_pos);
175                read_struct_from_buf(&tail_bytes.slice(offset..))?
176            };
177            Ok(metadata)
178        })
179        .await
180    }
181
182    /// Get the statistics page table. This will read the metadata if it is not cached.
183    ///
184    /// The page table is cached.
185    async fn read_stats_page_table(
186        reader: &dyn Reader,
187        cache: Option<&FileMetadataCache>,
188    ) -> Result<Arc<Option<PageTable>>> {
189        // To prevent collisions, we cache this at a child path
190        Self::load_from_cache(cache, &reader.path().child("stats"), |_| async {
191            let metadata = Self::read_metadata(reader, cache).await?;
192
193            if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
194                Ok(Some(
195                    PageTable::load(
196                        reader,
197                        stats_meta.page_table_position,
198                        /*min_field_id=*/ 0,
199                        /*max_field_id=*/ *stats_meta.leaf_field_ids.iter().max().unwrap(),
200                        /*num_batches=*/ 1,
201                    )
202                    .await?,
203                ))
204            } else {
205                Ok(None)
206            }
207        })
208        .await
209    }
210
211    /// Load some metadata about the fragment from the cache, if there is one.
212    async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
213        cache: Option<&FileMetadataCache>,
214        path: &Path,
215        loader: F,
216    ) -> Result<Arc<T>>
217    where
218        F: Fn(&Path) -> Fut,
219        Fut: Future<Output = Result<T>>,
220    {
221        if let Some(cache) = cache {
222            cache.get_or_insert(path, loader).await
223        } else {
224            Ok(Arc::new(loader(path).await?))
225        }
226    }
227
228    /// Open one Lance data file for read.
229    pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
230        // If just reading a lance data file we assume the schema is the schema of the data file
231        let max_field_id = schema.max_field_id().unwrap_or_default();
232        Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
233    }
234
235    fn io_parallelism(&self) -> usize {
236        self.object_reader.io_parallelism()
237    }
238
239    /// Requested projection of the data in this file, excluding the row id column.
240    pub fn schema(&self) -> &Schema {
241        &self.schema
242    }
243
244    pub fn num_batches(&self) -> usize {
245        self.metadata.num_batches()
246    }
247
248    /// Get the number of rows in this batch
249    pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
250        self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
251    }
252
253    /// Count the number of rows in this file.
254    pub fn len(&self) -> usize {
255        self.metadata.len()
256    }
257
258    pub fn is_empty(&self) -> bool {
259        self.metadata.is_empty()
260    }
261
262    /// Read a batch of data from the file.
263    ///
264    /// The schema of the returned [RecordBatch] is set by [`FileReader::schema()`].
265    #[instrument(level = "debug", skip(self, params, projection))]
266    pub async fn read_batch(
267        &self,
268        batch_id: i32,
269        params: impl Into<ReadBatchParams>,
270        projection: &Schema,
271    ) -> Result<RecordBatch> {
272        read_batch(self, &params.into(), projection, batch_id).await
273    }
274
275    /// Read a range of records into one batch.
276    ///
277    /// Note that it might call concat if the range is crossing multiple batches, which
278    /// makes it less efficient than [`FileReader::read_batch()`].
279    #[instrument(level = "debug", skip(self, projection))]
280    pub async fn read_range(
281        &self,
282        range: Range<usize>,
283        projection: &Schema,
284    ) -> Result<RecordBatch> {
285        if range.is_empty() {
286            return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
287        }
288        let range_in_batches = self.metadata.range_to_batches(range)?;
289        let batches =
290            stream::iter(range_in_batches)
291                .map(|(batch_id, range)| async move {
292                    self.read_batch(batch_id, range, projection).await
293                })
294                .buffered(self.io_parallelism())
295                .try_collect::<Vec<_>>()
296                .await?;
297        if batches.len() == 1 {
298            return Ok(batches[0].clone());
299        }
300        let schema = batches[0].schema();
301        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
302    }
303
304    /// Take by records by indices within the file.
305    ///
306    /// The indices must be sorted.
307    #[instrument(level = "debug", skip_all)]
308    pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
309        let num_batches = self.num_batches();
310        let num_rows = self.len() as u32;
311        let indices_in_batches = self.metadata.group_indices_to_batches(indices);
312        let batches = stream::iter(indices_in_batches)
313            .map(|batch| async move {
314                if batch.batch_id >= num_batches as i32 {
315                    Err(Error::InvalidInput {
316                        source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
317                        location: location!(),
318                    })
319                } else if *batch.offsets.last().expect("got empty batch") > num_rows {
320                    Err(Error::InvalidInput {
321                        source: format!("indices: {:?} out of bounds", batch.offsets).into(),
322                        location: location!(),
323                    })
324                } else {
325                    self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
326                        .await
327                }
328            })
329            .buffered(self.io_parallelism())
330            .try_collect::<Vec<_>>()
331            .await?;
332
333        let schema = Arc::new(ArrowSchema::from(projection));
334
335        Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
336    }
337
338    /// Get the schema of the statistics page table, for the given data field ids.
339    pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
340        self.metadata.stats_metadata.as_ref().map(|meta| {
341            let mut stats_field_ids = vec![];
342            for stats_field in &meta.schema.fields {
343                if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
344                    if field_ids.contains(&stats_field_id) {
345                        stats_field_ids.push(stats_field.id);
346                        for child in &stats_field.children {
347                            stats_field_ids.push(child.id);
348                        }
349                    }
350                }
351            }
352            meta.schema.project_by_ids(&stats_field_ids, true)
353        })
354    }
355
356    /// Get the page statistics for the given data field ids.
357    pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
358        if let Some(stats_page_table) = self.stats_page_table.as_ref() {
359            let projection = self.page_stats_schema(field_ids).unwrap();
360            // It's possible none of the requested fields have stats.
361            if projection.fields.is_empty() {
362                return Ok(None);
363            }
364            let arrays = futures::stream::iter(projection.fields.iter().cloned())
365                .map(|field| async move {
366                    read_array(
367                        self,
368                        &field,
369                        0,
370                        stats_page_table,
371                        &ReadBatchParams::RangeFull,
372                    )
373                    .await
374                })
375                .buffered(self.io_parallelism())
376                .try_collect::<Vec<_>>()
377                .await?;
378
379            let schema = ArrowSchema::from(&projection);
380            let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
381            Ok(Some(batch))
382        } else {
383            Ok(None)
384        }
385    }
386}
387
388/// Stream desired full batches from the file.
389///
390/// Parameters:
391/// - **reader**: An opened file reader.
392/// - **projection**: The schema of the returning [RecordBatch].
393/// - **predicate**: A function that takes a batch ID and returns true if the batch should be
394///                  returned.
395///
396/// Returns:
397/// - A stream of [RecordBatch]s, each one corresponding to one full batch in the file.
398pub fn batches_stream(
399    reader: FileReader,
400    projection: Schema,
401    predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
402) -> impl RecordBatchStream {
403    // Make projection an Arc so we can clone it and pass between threads.
404    let projection = Arc::new(projection);
405    let arrow_schema = ArrowSchema::from(projection.as_ref());
406
407    let total_batches = reader.num_batches() as i32;
408    let batches = (0..total_batches).filter(predicate);
409    // Make another copy of self so we can clone it and pass between threads.
410    let this = Arc::new(reader);
411    let inner = stream::iter(batches)
412        .zip(stream::repeat_with(move || {
413            (this.clone(), projection.clone())
414        }))
415        .map(move |(batch_id, (reader, projection))| async move {
416            reader
417                .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
418                .await
419        })
420        .buffered(2)
421        .boxed();
422    RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
423}
424
425/// Read a batch.
426///
427/// `schema` may only be empty if `with_row_id` is also true. This function
428/// panics otherwise.
429pub async fn read_batch(
430    reader: &FileReader,
431    params: &ReadBatchParams,
432    schema: &Schema,
433    batch_id: i32,
434) -> Result<RecordBatch> {
435    if !schema.fields.is_empty() {
436        // We box this because otherwise we get a higher-order lifetime error.
437        let arrs = stream::iter(&schema.fields)
438            .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
439            .buffered(reader.io_parallelism())
440            .try_collect::<Vec<_>>()
441            .boxed();
442        let arrs = arrs.await?;
443        Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
444    } else {
445        Err(Error::invalid_input("no fields requested", location!()))
446    }
447}
448
449#[async_recursion]
450async fn read_array(
451    reader: &FileReader,
452    field: &Field,
453    batch_id: i32,
454    page_table: &PageTable,
455    params: &ReadBatchParams,
456) -> Result<ArrayRef> {
457    let data_type = field.data_type();
458
459    use DataType::*;
460
461    if data_type.is_fixed_stride() {
462        _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
463    } else {
464        match data_type {
465            Null => read_null_array(field, batch_id, page_table, params),
466            Utf8 | LargeUtf8 | Binary | LargeBinary => {
467                read_binary_array(reader, field, batch_id, page_table, params).await
468            }
469            Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
470            Dictionary(_, _) => {
471                read_dictionary_array(reader, field, batch_id, page_table, params).await
472            }
473            List(_) => {
474                read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
475            }
476            LargeList(_) => {
477                read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
478            }
479            _ => {
480                unimplemented!("{}", format!("No support for {data_type} yet"));
481            }
482        }
483    }
484}
485
486fn get_page_info<'a>(
487    page_table: &'a PageTable,
488    field: &'a Field,
489    batch_id: i32,
490) -> Result<&'a PageInfo> {
491    page_table.get(field.id, batch_id).ok_or_else(|| {
492        Error::io(
493            format!(
494                "No page info found for field: {}, field_id={} batch={}",
495                field.name, field.id, batch_id
496            ),
497            location!(),
498        )
499    })
500}
501
502/// Read primitive array for batch `batch_idx`.
503async fn _read_fixed_stride_array(
504    reader: &FileReader,
505    field: &Field,
506    batch_id: i32,
507    page_table: &PageTable,
508    params: &ReadBatchParams,
509) -> Result<ArrayRef> {
510    let page_info = get_page_info(page_table, field, batch_id)?;
511    read_fixed_stride_array(
512        reader.object_reader.as_ref(),
513        &field.data_type(),
514        page_info.position,
515        page_info.length,
516        params.clone(),
517    )
518    .await
519}
520
521fn read_null_array(
522    field: &Field,
523    batch_id: i32,
524    page_table: &PageTable,
525    params: &ReadBatchParams,
526) -> Result<ArrayRef> {
527    let page_info = get_page_info(page_table, field, batch_id)?;
528
529    let length_output = match params {
530        ReadBatchParams::Indices(indices) => {
531            if indices.is_empty() {
532                0
533            } else {
534                let idx_max = *indices.values().iter().max().unwrap() as u64;
535                if idx_max >= page_info.length.try_into().unwrap() {
536                    return Err(Error::io(
537                        format!(
538                            "NullArray Reader: request([{}]) out of range: [0..{}]",
539                            idx_max, page_info.length
540                        ),
541                        location!(),
542                    ));
543                }
544                indices.len()
545            }
546        }
547        _ => {
548            let (idx_start, idx_end) = match params {
549                ReadBatchParams::Range(r) => (r.start, r.end),
550                ReadBatchParams::RangeFull => (0, page_info.length),
551                ReadBatchParams::RangeTo(r) => (0, r.end),
552                ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
553                _ => unreachable!(),
554            };
555            if idx_end > page_info.length {
556                return Err(Error::io(
557                    format!(
558                        "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
559                        // and wrap it in here.
560                        idx_start,
561                        idx_end,
562                        page_info.length
563                    ),
564                    location!(),
565                ));
566            }
567            idx_end - idx_start
568        }
569    };
570
571    Ok(Arc::new(NullArray::new(length_output)))
572}
573
574async fn read_binary_array(
575    reader: &FileReader,
576    field: &Field,
577    batch_id: i32,
578    page_table: &PageTable,
579    params: &ReadBatchParams,
580) -> Result<ArrayRef> {
581    let page_info = get_page_info(page_table, field, batch_id)?;
582
583    lance_io::utils::read_binary_array(
584        reader.object_reader.as_ref(),
585        &field.data_type(),
586        field.nullable,
587        page_info.position,
588        page_info.length,
589        params,
590    )
591    .await
592}
593
594async fn read_dictionary_array(
595    reader: &FileReader,
596    field: &Field,
597    batch_id: i32,
598    page_table: &PageTable,
599    params: &ReadBatchParams,
600) -> Result<ArrayRef> {
601    let page_info = get_page_info(page_table, field, batch_id)?;
602    let data_type = field.data_type();
603    let decoder = DictionaryDecoder::new(
604        reader.object_reader.as_ref(),
605        page_info.position,
606        page_info.length,
607        &data_type,
608        field
609            .dictionary
610            .as_ref()
611            .unwrap()
612            .values
613            .as_ref()
614            .unwrap()
615            .clone(),
616    );
617    decoder.get(params.clone()).await
618}
619
620async fn read_struct_array(
621    reader: &FileReader,
622    field: &Field,
623    batch_id: i32,
624    page_table: &PageTable,
625    params: &ReadBatchParams,
626) -> Result<ArrayRef> {
627    // TODO: use tokio to make the reads in parallel.
628    let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
629
630    for child in field.children.as_slice() {
631        let arr = read_array(reader, child, batch_id, page_table, params).await?;
632        sub_arrays.push((Arc::new(child.into()), arr));
633    }
634
635    Ok(Arc::new(StructArray::from(sub_arrays)))
636}
637
638async fn take_list_array<T: ArrowNumericType>(
639    reader: &FileReader,
640    field: &Field,
641    batch_id: i32,
642    page_table: &PageTable,
643    positions: &PrimitiveArray<T>,
644    indices: &UInt32Array,
645) -> Result<ArrayRef>
646where
647    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
648{
649    let first_idx = indices.value(0);
650    // Range of values for each index
651    let ranges = indices
652        .values()
653        .iter()
654        .map(|i| (*i - first_idx).as_usize())
655        .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
656        .collect::<Vec<_>>();
657    let field = field.clone();
658    let mut list_values: Vec<ArrayRef> = vec![];
659    // TODO: read them in parallel.
660    for range in ranges.iter() {
661        list_values.push(
662            read_array(
663                reader,
664                &field.children[0],
665                batch_id,
666                page_table,
667                &(range.clone()).into(),
668            )
669            .await?,
670        );
671    }
672
673    let value_refs = list_values
674        .iter()
675        .map(|arr| arr.as_ref())
676        .collect::<Vec<_>>();
677    let mut offsets_builder = PrimitiveBuilder::<T>::new();
678    offsets_builder.append_value(T::Native::usize_as(0));
679    let mut off = 0_usize;
680    for range in ranges {
681        off += range.len();
682        offsets_builder.append_value(T::Native::usize_as(off));
683    }
684    let all_values = concat::concat(value_refs.as_slice())?;
685    let offset_arr = offsets_builder.finish();
686    let arr = try_new_generic_list_array(all_values, &offset_arr)?;
687    Ok(Arc::new(arr) as ArrayRef)
688}
689
690async fn read_list_array<T: ArrowNumericType>(
691    reader: &FileReader,
692    field: &Field,
693    batch_id: i32,
694    page_table: &PageTable,
695    params: &ReadBatchParams,
696) -> Result<ArrayRef>
697where
698    T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
699{
700    // Offset the position array by 1 in order to include the upper bound of the last element
701    let positions_params = match params {
702        ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
703        ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
704        ReadBatchParams::Indices(indices) => {
705            (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
706        }
707        p => p.clone(),
708    };
709
710    let page_info = get_page_info(&reader.page_table, field, batch_id)?;
711    let position_arr = read_fixed_stride_array(
712        reader.object_reader.as_ref(),
713        &T::DATA_TYPE,
714        page_info.position,
715        page_info.length,
716        positions_params,
717    )
718    .await?;
719
720    let positions: &PrimitiveArray<T> = position_arr.as_primitive();
721
722    // Recompute params so they align with the offset array
723    let value_params = match params {
724        ReadBatchParams::Range(range) => ReadBatchParams::from(
725            positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
726        ),
727        ReadBatchParams::RangeTo(RangeTo { end }) => {
728            ReadBatchParams::from(..positions.value(*end).as_usize())
729        }
730        ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
731        ReadBatchParams::RangeFull => ReadBatchParams::from(
732            positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
733        ),
734        ReadBatchParams::Indices(indices) => {
735            return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
736        }
737    };
738
739    let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
740    let offset_arr = sub(positions, &start_position)?;
741    let offset_arr_ref = offset_arr.as_primitive::<T>();
742    let value_arrs = read_array(
743        reader,
744        &field.children[0],
745        batch_id,
746        page_table,
747        &value_params,
748    )
749    .await?;
750    let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
751    Ok(Arc::new(arr) as ArrayRef)
752}
753
754#[cfg(test)]
755mod tests {
756    use crate::writer::{FileWriter, NotSelfDescribing};
757
758    use super::*;
759
760    use arrow_array::{
761        builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
762        cast::{as_string_array, as_struct_array},
763        types::UInt8Type,
764        Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
765        UInt8Array,
766    };
767    use arrow_array::{BooleanArray, Int32Array};
768    use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
769
770    #[tokio::test]
771    async fn test_take() {
772        let arrow_schema = ArrowSchema::new(vec![
773            ArrowField::new("i", DataType::Int64, true),
774            ArrowField::new("f", DataType::Float32, false),
775            ArrowField::new("s", DataType::Utf8, false),
776            ArrowField::new(
777                "d",
778                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
779                false,
780            ),
781        ]);
782        let mut schema = Schema::try_from(&arrow_schema).unwrap();
783
784        let store = ObjectStore::memory();
785        let path = Path::from("/take_test");
786
787        // Write 10 batches.
788        let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
789        let values_ref = Arc::new(values);
790        let mut batches = vec![];
791        for batch_id in 0..10 {
792            let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
793            let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
794            let columns: Vec<ArrayRef> = vec![
795                Arc::new(Int64Array::from_iter(
796                    value_range.clone().collect::<Vec<_>>(),
797                )),
798                Arc::new(Float32Array::from_iter(
799                    value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
800                )),
801                Arc::new(StringArray::from_iter_values(
802                    value_range.clone().map(|n| format!("str-{}", n)),
803                )),
804                Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
805            ];
806            batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
807        }
808        schema.set_dictionary(&batches[0]).unwrap();
809
810        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
811            &store,
812            &path,
813            schema.clone(),
814            &Default::default(),
815        )
816        .await
817        .unwrap();
818        for batch in batches.iter() {
819            file_writer.write(&[batch.clone()]).await.unwrap();
820        }
821        file_writer.finish().await.unwrap();
822
823        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
824        let batch = reader
825            .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
826            .await
827            .unwrap();
828        let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
829        assert_eq!(
830            batch,
831            RecordBatch::try_new(
832                batch.schema(),
833                vec![
834                    Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
835                    Arc::new(Float32Array::from_iter_values([
836                        1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
837                    ])),
838                    Arc::new(StringArray::from_iter_values([
839                        "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
840                    ])),
841                    Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
842                ]
843            )
844            .unwrap()
845        );
846    }
847
848    async fn test_write_null_string_in_struct(field_nullable: bool) {
849        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
850            "parent",
851            DataType::Struct(ArrowFields::from(vec![ArrowField::new(
852                "str",
853                DataType::Utf8,
854                field_nullable,
855            )])),
856            true,
857        )]));
858
859        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
860
861        let store = ObjectStore::memory();
862        let path = Path::from("/null_strings");
863
864        let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
865        let struct_arr = Arc::new(StructArray::from(vec![(
866            Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
867            string_arr.clone() as ArrayRef,
868        )]));
869        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
870
871        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
872            &store,
873            &path,
874            schema.clone(),
875            &Default::default(),
876        )
877        .await
878        .unwrap();
879        file_writer.write(&[batch.clone()]).await.unwrap();
880        file_writer.finish().await.unwrap();
881
882        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
883        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
884
885        if field_nullable {
886            assert_eq!(
887                &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
888                as_string_array(
889                    as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
890                        .column_by_name("str")
891                        .unwrap()
892                        .as_ref()
893                )
894            );
895        } else {
896            assert_eq!(actual_batch, batch);
897        }
898    }
899
900    #[tokio::test]
901    async fn read_nullable_string_in_struct() {
902        test_write_null_string_in_struct(true).await;
903        test_write_null_string_in_struct(false).await;
904    }
905
906    #[tokio::test]
907    async fn test_read_struct_of_list_arrays() {
908        let store = ObjectStore::memory();
909        let path = Path::from("/null_strings");
910
911        let arrow_schema = make_schema_of_list_array();
912        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
913
914        let batches = (0..3)
915            .map(|_| {
916                let struct_array = make_struct_of_list_array(10, 10);
917                RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
918            })
919            .collect::<Vec<_>>();
920        let batches_ref = batches.iter().collect::<Vec<_>>();
921
922        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
923            &store,
924            &path,
925            schema.clone(),
926            &Default::default(),
927        )
928        .await
929        .unwrap();
930        file_writer.write(&batches).await.unwrap();
931        file_writer.finish().await.unwrap();
932
933        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
934        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
935        let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
936        assert_eq!(expected, actual_batch);
937    }
938
939    #[tokio::test]
940    async fn test_scan_struct_of_list_arrays() {
941        let store = ObjectStore::memory();
942        let path = Path::from("/null_strings");
943
944        let arrow_schema = make_schema_of_list_array();
945        let struct_array = make_struct_of_list_array(3, 10);
946        let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
947        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
948
949        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
950            &store,
951            &path,
952            schema.clone(),
953            &Default::default(),
954        )
955        .await
956        .unwrap();
957        file_writer.write(&[batch]).await.unwrap();
958        file_writer.finish().await.unwrap();
959
960        let mut expected_columns: Vec<ArrayRef> = Vec::new();
961        for c in struct_array.columns().iter() {
962            expected_columns.push(c.slice(1, 1));
963        }
964
965        let expected_struct = match arrow_schema.fields[0].data_type() {
966            DataType::Struct(subfields) => subfields
967                .iter()
968                .zip(expected_columns)
969                .map(|(f, d)| (f.clone(), d))
970                .collect::<Vec<_>>(),
971            _ => panic!("unexpected field"),
972        };
973
974        let expected_struct_array = StructArray::from(expected_struct);
975        let expected_batch = RecordBatch::from(&StructArray::from(vec![(
976            Arc::new(arrow_schema.fields[0].as_ref().clone()),
977            Arc::new(expected_struct_array) as ArrayRef,
978        )]));
979
980        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
981        let params = ReadBatchParams::Range(1..2);
982        let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
983        assert_eq!(expected_batch, slice_of_batch);
984    }
985
986    fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
987        Arc::new(ArrowSchema::new(vec![ArrowField::new(
988            "s",
989            DataType::Struct(ArrowFields::from(vec![
990                ArrowField::new(
991                    "li",
992                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
993                    true,
994                ),
995                ArrowField::new(
996                    "ls",
997                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
998                    true,
999                ),
1000                ArrowField::new(
1001                    "ll",
1002                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1003                    false,
1004                ),
1005            ])),
1006            true,
1007        )]))
1008    }
1009
1010    fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1011        let mut li_builder = ListBuilder::new(Int32Builder::new());
1012        let mut ls_builder = ListBuilder::new(StringBuilder::new());
1013        let ll_value_builder = Int32Builder::new();
1014        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1015        for i in 0..rows {
1016            for j in 0..num_items {
1017                li_builder.values().append_value(i * 10 + j);
1018                ls_builder
1019                    .values()
1020                    .append_value(format!("str-{}", i * 10 + j));
1021                large_list_builder.values().append_value(i * 10 + j);
1022            }
1023            li_builder.append(true);
1024            ls_builder.append(true);
1025            large_list_builder.append(true);
1026        }
1027        Arc::new(StructArray::from(vec![
1028            (
1029                Arc::new(ArrowField::new(
1030                    "li",
1031                    DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1032                    true,
1033                )),
1034                Arc::new(li_builder.finish()) as ArrayRef,
1035            ),
1036            (
1037                Arc::new(ArrowField::new(
1038                    "ls",
1039                    DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1040                    true,
1041                )),
1042                Arc::new(ls_builder.finish()) as ArrayRef,
1043            ),
1044            (
1045                Arc::new(ArrowField::new(
1046                    "ll",
1047                    DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1048                    false,
1049                )),
1050                Arc::new(large_list_builder.finish()) as ArrayRef,
1051            ),
1052        ]))
1053    }
1054
1055    #[tokio::test]
1056    async fn test_read_nullable_arrays() {
1057        use arrow_array::Array;
1058
1059        // create a record batch with a null array column
1060        let arrow_schema = ArrowSchema::new(vec![
1061            ArrowField::new("i", DataType::Int64, false),
1062            ArrowField::new("n", DataType::Null, true),
1063        ]);
1064        let schema = Schema::try_from(&arrow_schema).unwrap();
1065        let columns: Vec<ArrayRef> = vec![
1066            Arc::new(Int64Array::from_iter_values(0..100)),
1067            Arc::new(NullArray::new(100)),
1068        ];
1069        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1070
1071        // write to a lance file
1072        let store = ObjectStore::memory();
1073        let path = Path::from("/takes");
1074        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1075            &store,
1076            &path,
1077            schema.clone(),
1078            &Default::default(),
1079        )
1080        .await
1081        .unwrap();
1082        file_writer.write(&[batch]).await.unwrap();
1083        file_writer.finish().await.unwrap();
1084
1085        // read the file back
1086        let reader = FileReader::try_new(&store, &path, schema.clone())
1087            .await
1088            .unwrap();
1089
1090        async fn read_array_w_params(
1091            reader: &FileReader,
1092            field: &Field,
1093            params: ReadBatchParams,
1094        ) -> ArrayRef {
1095            read_array(reader, field, 0, reader.page_table.as_ref(), &params)
1096                .await
1097                .expect("Error reading back the null array from file") as _
1098        }
1099
1100        let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1101        assert_eq!(100, arr.len());
1102        assert_eq!(arr.data_type(), &DataType::Null);
1103
1104        let arr =
1105            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1106        assert_eq!(15, arr.len());
1107        assert_eq!(arr.data_type(), &DataType::Null);
1108
1109        let arr =
1110            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1111        assert_eq!(40, arr.len());
1112        assert_eq!(arr.data_type(), &DataType::Null);
1113
1114        let arr =
1115            read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1116        assert_eq!(25, arr.len());
1117        assert_eq!(arr.data_type(), &DataType::Null);
1118
1119        let arr = read_array_w_params(
1120            &reader,
1121            &schema.fields[1],
1122            ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1123        )
1124        .await;
1125        assert_eq!(4, arr.len());
1126        assert_eq!(arr.data_type(), &DataType::Null);
1127
1128        // raise error if take indices are out of bounds
1129        let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1130        let arr = read_array(
1131            &reader,
1132            &schema.fields[1],
1133            0,
1134            reader.page_table.as_ref(),
1135            &params,
1136        );
1137        assert!(arr.await.is_err());
1138
1139        // raise error if range indices are out of bounds
1140        let params = ReadBatchParams::RangeTo(..107);
1141        let arr = read_array(
1142            &reader,
1143            &schema.fields[1],
1144            0,
1145            reader.page_table.as_ref(),
1146            &params,
1147        );
1148        assert!(arr.await.is_err());
1149    }
1150
1151    #[tokio::test]
1152    async fn test_take_lists() {
1153        let arrow_schema = ArrowSchema::new(vec![
1154            ArrowField::new(
1155                "l",
1156                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1157                false,
1158            ),
1159            ArrowField::new(
1160                "ll",
1161                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1162                false,
1163            ),
1164        ]);
1165
1166        let value_builder = Int32Builder::new();
1167        let mut list_builder = ListBuilder::new(value_builder);
1168        let ll_value_builder = Int32Builder::new();
1169        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1170        for i in 0..100 {
1171            list_builder.values().append_value(i);
1172            large_list_builder.values().append_value(i);
1173            if (i + 1) % 10 == 0 {
1174                list_builder.append(true);
1175                large_list_builder.append(true);
1176            }
1177        }
1178        let list_arr = Arc::new(list_builder.finish());
1179        let large_list_arr = Arc::new(large_list_builder.finish());
1180
1181        let batch = RecordBatch::try_new(
1182            Arc::new(arrow_schema.clone()),
1183            vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1184        )
1185        .unwrap();
1186
1187        // write to a lance file
1188        let store = ObjectStore::memory();
1189        let path = Path::from("/take_list");
1190        let schema: Schema = (&arrow_schema).try_into().unwrap();
1191        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1192            &store,
1193            &path,
1194            schema.clone(),
1195            &Default::default(),
1196        )
1197        .await
1198        .unwrap();
1199        file_writer.write(&[batch]).await.unwrap();
1200        file_writer.finish().await.unwrap();
1201
1202        // read the file back
1203        let reader = FileReader::try_new(&store, &path, schema.clone())
1204            .await
1205            .unwrap();
1206        let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1207
1208        let value_builder = Int32Builder::new();
1209        let mut list_builder = ListBuilder::new(value_builder);
1210        let ll_value_builder = Int32Builder::new();
1211        let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1212        for i in [1, 3, 5, 9] {
1213            for j in 0..10 {
1214                list_builder.values().append_value(i * 10 + j);
1215                large_list_builder.values().append_value(i * 10 + j);
1216            }
1217            list_builder.append(true);
1218            large_list_builder.append(true);
1219        }
1220        let expected_list = list_builder.finish();
1221        let expected_large_list = large_list_builder.finish();
1222
1223        assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1224        assert_eq!(
1225            actual.column_by_name("ll").unwrap().as_ref(),
1226            &expected_large_list
1227        );
1228    }
1229
1230    #[tokio::test]
1231    async fn test_list_array_with_offsets() {
1232        let arrow_schema = ArrowSchema::new(vec![
1233            ArrowField::new(
1234                "l",
1235                DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1236                false,
1237            ),
1238            ArrowField::new(
1239                "ll",
1240                DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1241                false,
1242            ),
1243        ]);
1244
1245        let store = ObjectStore::memory();
1246        let path = Path::from("/lists");
1247
1248        let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1249            Some(vec![Some(1), Some(2)]),
1250            Some(vec![Some(3), Some(4)]),
1251            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1252        ])
1253        .slice(1, 1);
1254        let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1255            Some(vec![Some(10), Some(11)]),
1256            Some(vec![Some(12), Some(13)]),
1257            Some((0..2_000).map(Some).collect::<Vec<_>>()),
1258        ])
1259        .slice(1, 1);
1260
1261        let batch = RecordBatch::try_new(
1262            Arc::new(arrow_schema.clone()),
1263            vec![Arc::new(list_array), Arc::new(large_list_array)],
1264        )
1265        .unwrap();
1266
1267        let schema: Schema = (&arrow_schema).try_into().unwrap();
1268        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1269            &store,
1270            &path,
1271            schema.clone(),
1272            &Default::default(),
1273        )
1274        .await
1275        .unwrap();
1276        file_writer.write(&[batch.clone()]).await.unwrap();
1277        file_writer.finish().await.unwrap();
1278
1279        // Make sure the big array was not written to the file
1280        let file_size_bytes = store.size(&path).await.unwrap();
1281        assert!(file_size_bytes < 1_000);
1282
1283        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1284        let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1285        assert_eq!(batch, actual_batch);
1286    }
1287
1288    #[tokio::test]
1289    async fn test_read_ranges() {
1290        // create a record batch with a null array column
1291        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1292        let schema = Schema::try_from(&arrow_schema).unwrap();
1293        let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1294        let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1295
1296        // write to a lance file
1297        let store = ObjectStore::memory();
1298        let path = Path::from("/read_range");
1299        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1300            &store,
1301            &path,
1302            schema.clone(),
1303            &Default::default(),
1304        )
1305        .await
1306        .unwrap();
1307        file_writer.write(&[batch]).await.unwrap();
1308        file_writer.finish().await.unwrap();
1309
1310        let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1311        let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1312
1313        assert_eq!(
1314            actual_batch.column_by_name("i").unwrap().as_ref(),
1315            &Int64Array::from_iter_values(7..25)
1316        );
1317    }
1318
1319    #[tokio::test]
1320    async fn test_batches_stream() {
1321        let store = ObjectStore::memory();
1322        let path = Path::from("/batch_stream");
1323
1324        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1325        let schema = Schema::try_from(&arrow_schema).unwrap();
1326        let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1327            &store,
1328            &path,
1329            schema.clone(),
1330            &Default::default(),
1331        )
1332        .await
1333        .unwrap();
1334        for i in 0..10 {
1335            let batch = RecordBatch::try_new(
1336                Arc::new(arrow_schema.clone()),
1337                vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1338            )
1339            .unwrap();
1340            writer.write(&[batch]).await.unwrap();
1341        }
1342        writer.finish().await.unwrap();
1343
1344        let reader = FileReader::try_new(&store, &path, schema.clone())
1345            .await
1346            .unwrap();
1347        let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1348        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1349
1350        assert_eq!(batches.len(), 5);
1351        for (i, batch) in batches.iter().enumerate() {
1352            assert_eq!(
1353                batch,
1354                &RecordBatch::try_new(
1355                    Arc::new(arrow_schema.clone()),
1356                    vec![Arc::new(Int32Array::from_iter_values(
1357                        i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1358                    ))],
1359                )
1360                .unwrap()
1361            )
1362        }
1363    }
1364
1365    #[tokio::test]
1366    async fn test_take_boolean_beyond_chunk() {
1367        let mut store = ObjectStore::memory();
1368        store.set_block_size(256);
1369        let path = Path::from("/take_bools");
1370
1371        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1372            "b",
1373            DataType::Boolean,
1374            false,
1375        )]));
1376        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1377        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1378            &store,
1379            &path,
1380            schema.clone(),
1381            &Default::default(),
1382        )
1383        .await
1384        .unwrap();
1385
1386        let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1387        let batch =
1388            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1389        file_writer.write(&[batch]).await.unwrap();
1390        file_writer.finish().await.unwrap();
1391
1392        let reader = FileReader::try_new(&store, &path, schema.clone())
1393            .await
1394            .unwrap();
1395        let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1396
1397        assert_eq!(
1398            actual.column_by_name("b").unwrap().as_ref(),
1399            &BooleanArray::from(vec![false, false, true, false, true])
1400        );
1401    }
1402
1403    #[tokio::test]
1404    async fn test_read_projection() {
1405        // The dataset schema may be very large.  The file reader should support reading
1406        // a small projection of that schema (this just tests the field_offset / num_fields
1407        // parameters)
1408        let store = ObjectStore::memory();
1409        let path = Path::from("/partial_read");
1410
1411        // Create a large schema
1412        let mut fields = vec![];
1413        for i in 0..100 {
1414            fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1415        }
1416        let arrow_schema = ArrowSchema::new(fields);
1417        let schema = Schema::try_from(&arrow_schema).unwrap();
1418
1419        let partial_schema = schema.project(&["f50"]).unwrap();
1420        let partial_arrow: ArrowSchema = (&partial_schema).into();
1421
1422        let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1423            &store,
1424            &path,
1425            partial_schema.clone(),
1426            &Default::default(),
1427        )
1428        .await
1429        .unwrap();
1430
1431        let array = Int32Array::from(vec![0; 15]);
1432        let batch =
1433            RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1434        file_writer.write(&[batch.clone()]).await.unwrap();
1435        file_writer.finish().await.unwrap();
1436
1437        let field_id = partial_schema.fields.first().unwrap().id;
1438        let reader = FileReader::try_new_with_fragment_id(
1439            &store,
1440            &path,
1441            schema.clone(),
1442            0,
1443            /*min_field_id=*/ field_id,
1444            /*max_field_id=*/ field_id,
1445            None,
1446        )
1447        .await
1448        .unwrap();
1449        let actual = reader
1450            .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1451            .await
1452            .unwrap();
1453
1454        assert_eq!(actual, batch);
1455    }
1456}