lance_file/v2/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::ArrowError;
8use futures::TryStreamExt;
9use lance_core::{
10    cache::{CapacityMode, FileMetadataCache},
11    datatypes::Schema,
12};
13use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
14use lance_io::{
15    object_store::ObjectStore,
16    scheduler::{ScanScheduler, SchedulerConfig},
17    ReadBatchParams,
18};
19use object_store::path::Path;
20use tempfile::TempDir;
21
22use crate::v2::reader::{FileReader, FileReaderOptions};
23
24use super::writer::{FileWriter, FileWriterOptions};
25
26pub struct FsFixture {
27    _tmp_dir: TempDir,
28    pub tmp_path: Path,
29    pub object_store: Arc<ObjectStore>,
30    pub scheduler: Arc<ScanScheduler>,
31}
32
33impl Default for FsFixture {
34    fn default() -> Self {
35        let tmp_dir = tempfile::tempdir().unwrap();
36        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
37        let tmp_path = Path::parse(tmp_path).unwrap();
38        let tmp_path = tmp_path.child("some_file.lance");
39        let object_store = Arc::new(ObjectStore::local());
40        let scheduler =
41            ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
42        Self {
43            _tmp_dir: tmp_dir,
44            object_store,
45            tmp_path,
46            scheduler,
47        }
48    }
49}
50
51pub struct WrittenFile {
52    pub schema: Arc<Schema>,
53    pub data: Vec<RecordBatch>,
54    pub field_id_mapping: Vec<(u32, u32)>,
55}
56
57pub async fn write_lance_file(
58    data: impl RecordBatchReader,
59    fs: &FsFixture,
60    options: FileWriterOptions,
61) -> WrittenFile {
62    let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
63
64    let lance_schema = lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap();
65
66    let mut file_writer = FileWriter::try_new(writer, lance_schema.clone(), options).unwrap();
67
68    let data = data
69        .collect::<std::result::Result<Vec<_>, ArrowError>>()
70        .unwrap();
71
72    for batch in &data {
73        file_writer.write_batch(batch).await.unwrap();
74    }
75    let field_id_mapping = file_writer.field_id_to_column_indices().to_vec();
76    file_writer.add_schema_metadata("foo", "bar");
77    file_writer.finish().await.unwrap();
78    WrittenFile {
79        schema: Arc::new(lance_schema),
80        data,
81        field_id_mapping,
82    }
83}
84
85pub fn test_cache() -> Arc<FileMetadataCache> {
86    Arc::new(FileMetadataCache::with_capacity(
87        128 * 1024 * 1024,
88        CapacityMode::Bytes,
89    ))
90}
91
92pub async fn read_lance_file(
93    fs: &FsFixture,
94    decoder_middleware: Arc<DecoderPlugins>,
95    filter: FilterExpression,
96) -> Vec<RecordBatch> {
97    let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
98    let file_reader = FileReader::try_open(
99        file_scheduler,
100        None,
101        decoder_middleware,
102        &test_cache(),
103        FileReaderOptions::default(),
104    )
105    .await
106    .unwrap();
107
108    let schema = file_reader.schema();
109    assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
110
111    let batch_stream = file_reader
112        .read_stream(ReadBatchParams::RangeFull, 1024, 16, filter)
113        .unwrap();
114
115    batch_stream.try_collect().await.unwrap()
116}
117
118pub async fn count_lance_file(
119    fs: &FsFixture,
120    decoder_middleware: Arc<DecoderPlugins>,
121    filter: FilterExpression,
122) -> usize {
123    read_lance_file(fs, decoder_middleware, filter)
124        .await
125        .iter()
126        .map(|b| b.num_rows())
127        .sum()
128}