lance_file/v2/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use arrow_array::RecordBatch;
9
10use arrow_data::ArrayData;
11use bytes::{BufMut, Bytes, BytesMut};
12use futures::stream::FuturesOrdered;
13use futures::StreamExt;
14use lance_core::datatypes::{Field, Schema as LanceSchema};
15use lance_core::utils::bit::pad_bytes;
16use lance_core::{Error, Result};
17use lance_encoding::decoder::PageEncoding;
18use lance_encoding::encoder::{
19    default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
20    EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
21};
22use lance_encoding::repdef::RepDefBuilder;
23use lance_encoding::version::LanceFileVersion;
24use lance_io::object_store::ObjectStore;
25use lance_io::object_writer::ObjectWriter;
26use lance_io::traits::Writer;
27use log::debug;
28use object_store::path::Path;
29use prost::Message;
30use prost_types::Any;
31use snafu::location;
32use tokio::io::AsyncWriteExt;
33use tracing::instrument;
34
35use crate::datatypes::FieldsWithMeta;
36use crate::format::pb;
37use crate::format::pbfile;
38use crate::format::pbfile::DirectEncoding;
39use crate::format::MAGIC;
40
41/// Pages buffers are aligned to 64 bytes
42pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
43const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
44
45#[derive(Debug, Clone, Default)]
46pub struct FileWriterOptions {
47    /// How many bytes to use for buffering column data
48    ///
49    /// When data comes in small batches the writer will buffer column data so that
50    /// larger pages can be created.  This value will be divided evenly across all of the
51    /// columns.  Generally you want this to be at least large enough to match your
52    /// filesystem's ideal read size per column.
53    ///
54    /// In some cases you might want this value to be even larger if you have highly
55    /// compressible data.  However, if this is too large, then the writer could require
56    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
57    /// falls behind and can't be interleaved with the I/O expensive flushing.
58    ///
59    /// The default will use 8MiB per column which should be reasonable for most cases.
60    // TODO: Do we need to be able to set this on a per-column basis?
61    pub data_cache_bytes: Option<u64>,
62    /// A hint to indicate the max size of a page
63    ///
64    /// This hint can't always be respected.  A single value could be larger than this value
65    /// and we never slice single values.  In addition, there are some cases where it can be
66    /// difficult to know size up-front and so we might not be able to respect this value.
67    pub max_page_bytes: Option<u64>,
68    /// The file writer buffers columns until enough data has arrived to flush a page
69    /// to disk.
70    ///
71    /// Some columns with small data types may not flush very often.  These arrays can
72    /// stick around for a long time.  These arrays might also be keeping larger data
73    /// structures alive.  By default, the writer will make a deep copy of this array
74    /// to avoid any potential memory leaks.  However, this can be disabled for a
75    /// (probably minor) performance boost if you are sure that arrays are not keeping
76    /// any sibling structures alive (this typically means the array was allocated in
77    /// the same language / runtime as the writer)
78    ///
79    /// Do not enable this if your data is arriving from the C data interface.
80    /// Data typically arrives one "batch" at a time (encoded in the C data interface
81    /// as a struct array).  Each array in that batch keeps the entire batch alive.
82    /// This means a small boolean array (which we will buffer in memory for quite a
83    /// while) might keep a much larger record batch around in memory (even though most
84    /// of that batch's data has been written to disk)
85    pub keep_original_array: Option<bool>,
86    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
87    /// The format version to use when writing the file
88    ///
89    /// This controls which encodings will be used when encoding the data.  Newer
90    /// versions may have more efficient encodings.  However, newer format versions will
91    /// require more up-to-date readers to read the data.
92    pub format_version: Option<LanceFileVersion>,
93}
94
95pub struct FileWriter {
96    writer: ObjectWriter,
97    schema: Option<LanceSchema>,
98    column_writers: Vec<Box<dyn FieldEncoder>>,
99    column_metadata: Vec<pbfile::ColumnMetadata>,
100    field_id_to_column_indices: Vec<(u32, u32)>,
101    num_columns: u32,
102    rows_written: u64,
103    global_buffers: Vec<(u64, u64)>,
104    schema_metadata: HashMap<String, String>,
105    options: FileWriterOptions,
106}
107
108fn initial_column_metadata() -> pbfile::ColumnMetadata {
109    pbfile::ColumnMetadata {
110        pages: Vec::new(),
111        buffer_offsets: Vec::new(),
112        buffer_sizes: Vec::new(),
113        encoding: None,
114    }
115}
116
117impl FileWriter {
118    /// Create a new FileWriter with a desired output schema
119    pub fn try_new(
120        object_writer: ObjectWriter,
121        schema: LanceSchema,
122        options: FileWriterOptions,
123    ) -> Result<Self> {
124        let mut writer = Self::new_lazy(object_writer, options);
125        writer.initialize(schema)?;
126        Ok(writer)
127    }
128
129    /// Create a new FileWriter without a desired output schema
130    ///
131    /// The output schema will be set based on the first batch of data to arrive.
132    /// If no data arrives and the writer is finished then the write will fail.
133    pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
134        Self {
135            writer: object_writer,
136            schema: None,
137            column_writers: Vec::new(),
138            column_metadata: Vec::new(),
139            num_columns: 0,
140            rows_written: 0,
141            field_id_to_column_indices: Vec::new(),
142            global_buffers: Vec::new(),
143            schema_metadata: HashMap::new(),
144            options,
145        }
146    }
147
148    /// Write a series of record batches to a new file
149    ///
150    /// Returns the number of rows written
151    pub async fn create_file_with_batches(
152        store: &ObjectStore,
153        path: &Path,
154        schema: lance_core::datatypes::Schema,
155        batches: impl Iterator<Item = RecordBatch> + Send,
156        options: FileWriterOptions,
157    ) -> Result<usize> {
158        let writer = store.create(path).await?;
159        let mut writer = Self::try_new(writer, schema, options)?;
160        for batch in batches {
161            writer.write_batch(&batch).await?;
162        }
163        Ok(writer.finish().await? as usize)
164    }
165
166    async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
167        writer.write_all(buf).await?;
168        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
169        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
170        Ok(())
171    }
172
173    /// Returns the format version that will be used when writing the file
174    pub fn version(&self) -> LanceFileVersion {
175        self.options.format_version.unwrap_or_default()
176    }
177
178    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
179        let buffers = encoded_page.data;
180        let mut buffer_offsets = Vec::with_capacity(buffers.len());
181        let mut buffer_sizes = Vec::with_capacity(buffers.len());
182        for buffer in buffers {
183            buffer_offsets.push(self.writer.tell().await? as u64);
184            buffer_sizes.push(buffer.len() as u64);
185            Self::do_write_buffer(&mut self.writer, &buffer).await?;
186        }
187        let encoded_encoding = match encoded_page.description {
188            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
189            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
190        };
191        let page = pbfile::column_metadata::Page {
192            buffer_offsets,
193            buffer_sizes,
194            encoding: Some(pbfile::Encoding {
195                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
196                    encoding: encoded_encoding,
197                })),
198            }),
199            length: encoded_page.num_rows,
200            priority: encoded_page.row_number,
201        };
202        self.column_metadata[encoded_page.column_idx as usize]
203            .pages
204            .push(page);
205        Ok(())
206    }
207
208    #[instrument(skip_all, level = "debug")]
209    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
210        // As soon as an encoding task is done we write it.  There is no parallelism
211        // needed here because "writing" is really just submitting the buffer to the
212        // underlying write scheduler (either the OS or object_store's scheduler for
213        // cloud writes).  The only time we might truly await on write_page is if the
214        // scheduler's write queue is full.
215        //
216        // Also, there is no point in trying to make write_page parallel anyways
217        // because we wouldn't want buffers getting mixed up across pages.
218        while let Some(encoding_task) = encoding_tasks.next().await {
219            let encoded_page = encoding_task?;
220            self.write_page(encoded_page).await?;
221        }
222        // It's important to flush here, we don't know when the next batch will arrive
223        // and the underlying cloud store could have writes in progress that won't advance
224        // until we interact with the writer again.  These in-progress writes will time out
225        // if we don't flush.
226        self.writer.flush().await?;
227        Ok(())
228    }
229
230    /// Schedule batches of data to be written to the file
231    pub async fn write_batches(
232        &mut self,
233        batches: impl Iterator<Item = &RecordBatch>,
234    ) -> Result<()> {
235        for batch in batches {
236            self.write_batch(batch).await?;
237        }
238        Ok(())
239    }
240
241    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
242        if !field.nullable && arr.null_count() > 0 {
243            return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
244        }
245
246        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
247            Self::verify_field_nullability(child_arr, child_field)?;
248        }
249
250        Ok(())
251    }
252
253    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
254        for (col, field) in batch
255            .columns()
256            .iter()
257            .zip(self.schema.as_ref().unwrap().fields.iter())
258        {
259            Self::verify_field_nullability(&col.to_data(), field)?;
260        }
261        Ok(())
262    }
263
264    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
265        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
266            data_cache_bytes / schema.fields.len() as u64
267        } else {
268            8 * 1024 * 1024
269        };
270
271        let max_page_bytes = self.options.max_page_bytes.unwrap_or(32 * 1024 * 1024);
272
273        schema.validate()?;
274
275        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
276        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
277            let version = self.version();
278            default_encoding_strategy(version).into()
279        });
280
281        let encoding_options = EncodingOptions {
282            cache_bytes_per_column,
283            max_page_bytes,
284            keep_original_array,
285            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
286        };
287        let encoder =
288            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
289        self.num_columns = encoder.num_columns();
290
291        self.column_writers = encoder.field_encoders;
292        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
293        self.field_id_to_column_indices = encoder.field_id_to_column_index;
294        self.schema_metadata
295            .extend(std::mem::take(&mut schema.metadata));
296        self.schema = Some(schema);
297        Ok(())
298    }
299
300    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
301        if self.schema.is_none() {
302            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
303            self.initialize(schema)?;
304        }
305        Ok(self.schema.as_ref().unwrap())
306    }
307
308    #[instrument(skip_all, level = "debug")]
309    fn encode_batch(
310        &mut self,
311        batch: &RecordBatch,
312        external_buffers: &mut OutOfLineBuffers,
313    ) -> Result<Vec<Vec<EncodeTask>>> {
314        self.schema
315            .as_ref()
316            .unwrap()
317            .fields
318            .iter()
319            .zip(self.column_writers.iter_mut())
320            .map(|(field, column_writer)| {
321                let array = batch
322                    .column_by_name(&field.name)
323                    .ok_or(Error::InvalidInput {
324                        source: format!(
325                            "Cannot write batch.  The batch was missing the column `{}`",
326                            field.name
327                        )
328                        .into(),
329                        location: location!(),
330                    })?;
331                let repdef = RepDefBuilder::default();
332                let num_rows = array.len() as u64;
333                column_writer.maybe_encode(
334                    array.clone(),
335                    external_buffers,
336                    repdef,
337                    self.rows_written,
338                    num_rows,
339                )
340            })
341            .collect::<Result<Vec<_>>>()
342    }
343
344    /// Schedule a batch of data to be written to the file
345    ///
346    /// Note: the future returned by this method may complete before the data has been fully
347    /// flushed to the file (some data may be in the data cache or the I/O cache)
348    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
349        debug!(
350            "write_batch called with {} bytes of data",
351            batch.get_array_memory_size()
352        );
353        self.ensure_initialized(batch)?;
354        self.verify_nullability_constraints(batch)?;
355        let num_rows = batch.num_rows() as u64;
356        if num_rows == 0 {
357            return Ok(());
358        }
359        if num_rows > u32::MAX as u64 {
360            return Err(Error::InvalidInput {
361                source: "cannot write Lance files with more than 2^32 rows".into(),
362                location: location!(),
363            });
364        }
365        // First we push each array into its column writer.  This may or may not generate enough
366        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
367        let mut external_buffers =
368            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
369        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
370        // Next, write external buffers
371        for external_buffer in external_buffers.take_buffers() {
372            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
373        }
374
375        let encoding_tasks = encoding_tasks
376            .into_iter()
377            .flatten()
378            .collect::<FuturesOrdered<_>>();
379
380        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
381            Some(rows_written) => rows_written,
382            None => {
383                return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
384            }
385        };
386
387        self.write_pages(encoding_tasks).await?;
388
389        Ok(())
390    }
391
392    async fn write_column_metadata(
393        &mut self,
394        metadata: pbfile::ColumnMetadata,
395    ) -> Result<(u64, u64)> {
396        let metadata_bytes = metadata.encode_to_vec();
397        let position = self.writer.tell().await? as u64;
398        let len = metadata_bytes.len() as u64;
399        self.writer.write_all(&metadata_bytes).await?;
400        Ok((position, len))
401    }
402
403    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
404        let mut metadatas = Vec::new();
405        std::mem::swap(&mut self.column_metadata, &mut metadatas);
406        let mut metadata_positions = Vec::with_capacity(metadatas.len());
407        for metadata in metadatas {
408            metadata_positions.push(self.write_column_metadata(metadata).await?);
409        }
410        Ok(metadata_positions)
411    }
412
413    fn make_file_descriptor(
414        schema: &lance_core::datatypes::Schema,
415        num_rows: u64,
416    ) -> Result<pb::FileDescriptor> {
417        let fields_with_meta = FieldsWithMeta::from(schema);
418        Ok(pb::FileDescriptor {
419            schema: Some(pb::Schema {
420                fields: fields_with_meta.fields.0,
421                metadata: fields_with_meta.metadata,
422            }),
423            length: num_rows,
424        })
425    }
426
427    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
428        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created", location!()))?;
429        schema.metadata = std::mem::take(&mut self.schema_metadata);
430        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
431        let file_descriptor_bytes = file_descriptor.encode_to_vec();
432        let file_descriptor_len = file_descriptor_bytes.len() as u64;
433        let file_descriptor_position = self.writer.tell().await? as u64;
434        self.writer.write_all(&file_descriptor_bytes).await?;
435        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
436        gbo_table.push((file_descriptor_position, file_descriptor_len));
437        gbo_table.append(&mut self.global_buffers);
438        Ok(gbo_table)
439    }
440
441    /// Add a metadata entry to the schema
442    ///
443    /// This method is useful because sometimes the metadata is not known until after the
444    /// data has been written.  This method allows you to alter the schema metadata.  It
445    /// must be called before `finish` is called.
446    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
447        self.schema_metadata.insert(key.into(), value.into());
448    }
449
450    /// Adds a global buffer to the file
451    ///
452    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
453    /// immediately.  This method returns the index of the global buffer (this will always
454    /// start at 1 and increment by 1 each time this method is called)
455    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
456        let position = self.writer.tell().await? as u64;
457        let len = buffer.len() as u64;
458        Self::do_write_buffer(&mut self.writer, &buffer).await?;
459        self.global_buffers.push((position, len));
460        Ok(self.global_buffers.len() as u32)
461    }
462
463    async fn finish_writers(&mut self) -> Result<()> {
464        let mut col_idx = 0;
465        for mut writer in std::mem::take(&mut self.column_writers) {
466            let mut external_buffers =
467                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
468            let columns = writer.finish(&mut external_buffers).await?;
469            for buffer in external_buffers.take_buffers() {
470                self.writer.write_all(&buffer).await?;
471            }
472            debug_assert_eq!(
473                columns.len(),
474                writer.num_columns() as usize,
475                "Expected {} columns from column at index {} and got {}",
476                writer.num_columns(),
477                col_idx,
478                columns.len()
479            );
480            for column in columns {
481                for page in column.final_pages {
482                    self.write_page(page).await?;
483                }
484                let column_metadata = &mut self.column_metadata[col_idx];
485                let mut buffer_pos = self.writer.tell().await? as u64;
486                for buffer in column.column_buffers {
487                    column_metadata.buffer_offsets.push(buffer_pos);
488                    let mut size = 0;
489                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
490                    size += buffer.len() as u64;
491                    buffer_pos += size;
492                    column_metadata.buffer_sizes.push(size);
493                }
494                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
495                column_metadata.encoding = Some(pbfile::Encoding {
496                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
497                        encoding: encoded_encoding,
498                    })),
499                });
500                col_idx += 1;
501            }
502        }
503        if col_idx != self.column_metadata.len() {
504            panic!(
505                "Column writers finished with {} columns but we expected {}",
506                col_idx,
507                self.column_metadata.len()
508            );
509        }
510        Ok(())
511    }
512
513    /// Converts self.version (which is a mix of "software version" and
514    /// "format version" into a format version)
515    fn version_to_numbers(&self) -> (u16, u16) {
516        let version = self.options.format_version.unwrap_or_default();
517        match version.resolve() {
518            LanceFileVersion::V2_0 => (0, 3),
519            LanceFileVersion::V2_1 => (2, 1),
520            _ => panic!("Unsupported version: {}", version),
521        }
522    }
523
524    /// Finishes writing the file
525    ///
526    /// This method will wait until all data has been flushed to the file.  Then it
527    /// will write the file metadata and the footer.  It will not return until all
528    /// data has been flushed and the file has been closed.
529    ///
530    /// Returns the total number of rows written
531    pub async fn finish(&mut self) -> Result<u64> {
532        // 1. flush any remaining data and write out those pages
533        let mut external_buffers =
534            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
535        let encoding_tasks = self
536            .column_writers
537            .iter_mut()
538            .map(|writer| writer.flush(&mut external_buffers))
539            .collect::<Result<Vec<_>>>()?;
540        for external_buffer in external_buffers.take_buffers() {
541            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
542        }
543        let encoding_tasks = encoding_tasks
544            .into_iter()
545            .flatten()
546            .collect::<FuturesOrdered<_>>();
547        self.write_pages(encoding_tasks).await?;
548
549        self.finish_writers().await?;
550
551        // 3. write global buffers (we write the schema here)
552        let global_buffer_offsets = self.write_global_buffers().await?;
553        let num_global_buffers = global_buffer_offsets.len() as u32;
554
555        // 4. write the column metadatas
556        let column_metadata_start = self.writer.tell().await? as u64;
557        let metadata_positions = self.write_column_metadatas().await?;
558
559        // 5. write the column metadata offset table
560        let cmo_table_start = self.writer.tell().await? as u64;
561        for (meta_pos, meta_len) in metadata_positions {
562            self.writer.write_u64_le(meta_pos).await?;
563            self.writer.write_u64_le(meta_len).await?;
564        }
565
566        // 6. write global buffers offset table
567        let gbo_table_start = self.writer.tell().await? as u64;
568        for (gbo_pos, gbo_len) in global_buffer_offsets {
569            self.writer.write_u64_le(gbo_pos).await?;
570            self.writer.write_u64_le(gbo_len).await?;
571        }
572
573        let (major, minor) = self.version_to_numbers();
574        // 7. write the footer
575        self.writer.write_u64_le(column_metadata_start).await?;
576        self.writer.write_u64_le(cmo_table_start).await?;
577        self.writer.write_u64_le(gbo_table_start).await?;
578        self.writer.write_u32_le(num_global_buffers).await?;
579        self.writer.write_u32_le(self.num_columns).await?;
580        self.writer.write_u16_le(major).await?;
581        self.writer.write_u16_le(minor).await?;
582        self.writer.write_all(MAGIC).await?;
583
584        // 7. close the writer
585        self.writer.shutdown().await?;
586        Ok(self.rows_written)
587    }
588
589    pub async fn tell(&mut self) -> Result<u64> {
590        Ok(self.writer.tell().await? as u64)
591    }
592
593    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
594        &self.field_id_to_column_indices
595    }
596}
597
598/// Utility trait for converting EncodedBatch to Bytes using the
599/// lance file format
600pub trait EncodedBatchWriteExt {
601    /// Serializes into a lance file, including the schema
602    fn try_to_self_described_lance(&self) -> Result<Bytes>;
603    /// Serializes into a lance file, without the schema.
604    ///
605    /// The schema must be provided to deserialize the buffer
606    fn try_to_mini_lance(&self) -> Result<Bytes>;
607}
608
609// Creates a lance footer and appends it to the encoded data
610//
611// The logic here is very similar to logic in the FileWriter except we
612// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
613fn concat_lance_footer(batch: &EncodedBatch, write_schema: bool) -> Result<Bytes> {
614    // Estimating 1MiB for file footer
615    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
616    data.put(batch.data.clone());
617    // write global buffers (we write the schema here)
618    let global_buffers = if write_schema {
619        let schema_start = data.len() as u64;
620        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
621        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
622        let descriptor_bytes = descriptor.encode_to_vec();
623        let descriptor_len = descriptor_bytes.len() as u64;
624        data.put(descriptor_bytes.as_slice());
625
626        vec![(schema_start, descriptor_len)]
627    } else {
628        vec![]
629    };
630    let col_metadata_start = data.len() as u64;
631
632    let mut col_metadata_positions = Vec::new();
633    // Write column metadata
634    for col in &batch.page_table {
635        let position = data.len() as u64;
636        let pages = col
637            .page_infos
638            .iter()
639            .map(|page_info| {
640                let encoded_encoding = match &page_info.encoding {
641                    PageEncoding::Legacy(array_encoding) => {
642                        Any::from_msg(array_encoding)?.encode_to_vec()
643                    }
644                    PageEncoding::Structural(page_layout) => {
645                        Any::from_msg(page_layout)?.encode_to_vec()
646                    }
647                };
648                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
649                    .buffer_offsets_and_sizes
650                    .as_ref()
651                    .iter()
652                    .cloned()
653                    .unzip();
654                Ok(pbfile::column_metadata::Page {
655                    buffer_offsets,
656                    buffer_sizes,
657                    encoding: Some(pbfile::Encoding {
658                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
659                            encoding: encoded_encoding,
660                        })),
661                    }),
662                    length: page_info.num_rows,
663                    priority: page_info.priority,
664                })
665            })
666            .collect::<Result<Vec<_>>>()?;
667        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
668            col.buffer_offsets_and_sizes.iter().cloned().unzip();
669        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
670        let column = pbfile::ColumnMetadata {
671            pages,
672            buffer_offsets,
673            buffer_sizes,
674            encoding: Some(pbfile::Encoding {
675                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
676                    encoding: encoded_col_encoding,
677                })),
678            }),
679        };
680        let column_bytes = column.encode_to_vec();
681        col_metadata_positions.push((position, column_bytes.len() as u64));
682        data.put(column_bytes.as_slice());
683    }
684    // Write column metadata offsets table
685    let cmo_table_start = data.len() as u64;
686    for (meta_pos, meta_len) in col_metadata_positions {
687        data.put_u64_le(meta_pos);
688        data.put_u64_le(meta_len);
689    }
690    // Write global buffers offsets table
691    let gbo_table_start = data.len() as u64;
692    let num_global_buffers = global_buffers.len() as u32;
693    for (gbo_pos, gbo_len) in global_buffers {
694        data.put_u64_le(gbo_pos);
695        data.put_u64_le(gbo_len);
696    }
697
698    let (major, minor) = LanceFileVersion::default().to_numbers();
699
700    // write the footer
701    data.put_u64_le(col_metadata_start);
702    data.put_u64_le(cmo_table_start);
703    data.put_u64_le(gbo_table_start);
704    data.put_u32_le(num_global_buffers);
705    data.put_u32_le(batch.page_table.len() as u32);
706    data.put_u16_le(major as u16);
707    data.put_u16_le(minor as u16);
708    data.put(MAGIC.as_slice());
709
710    Ok(data.freeze())
711}
712
713impl EncodedBatchWriteExt for EncodedBatch {
714    fn try_to_self_described_lance(&self) -> Result<Bytes> {
715        concat_lance_footer(self, true)
716    }
717
718    fn try_to_mini_lance(&self) -> Result<Bytes> {
719        concat_lance_footer(self, false)
720    }
721}
722
723#[cfg(test)]
724mod tests {
725    use std::sync::Arc;
726
727    use arrow_array::{types::Float64Type, RecordBatchReader};
728    use lance_datagen::{array, gen, BatchCount, RowCount};
729    use lance_io::object_store::ObjectStore;
730    use object_store::path::Path;
731
732    use crate::v2::writer::{FileWriter, FileWriterOptions};
733
734    #[tokio::test]
735    async fn test_basic_write() {
736        let tmp_dir = tempfile::tempdir().unwrap();
737        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
738        let tmp_path = Path::parse(tmp_path).unwrap();
739        let tmp_path = tmp_path.child("some_file.lance");
740        let obj_store = Arc::new(ObjectStore::local());
741
742        let reader = gen()
743            .col("score", array::rand::<Float64Type>())
744            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
745
746        let writer = obj_store.create(&tmp_path).await.unwrap();
747
748        let lance_schema =
749            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
750
751        let mut file_writer =
752            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
753
754        for batch in reader {
755            file_writer.write_batch(&batch.unwrap()).await.unwrap();
756        }
757        file_writer.add_schema_metadata("foo", "bar");
758        file_writer.finish().await.unwrap();
759        // Tests asserting the contents of the written file are in reader.rs
760    }
761
762    #[tokio::test]
763    async fn test_write_empty() {
764        let tmp_dir = tempfile::tempdir().unwrap();
765        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
766        let tmp_path = Path::parse(tmp_path).unwrap();
767        let tmp_path = tmp_path.child("some_file.lance");
768        let obj_store = Arc::new(ObjectStore::local());
769
770        let reader = gen()
771            .col("score", array::rand::<Float64Type>())
772            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
773
774        let writer = obj_store.create(&tmp_path).await.unwrap();
775
776        let lance_schema =
777            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
778
779        let mut file_writer =
780            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
781
782        for batch in reader {
783            file_writer.write_batch(&batch.unwrap()).await.unwrap();
784        }
785        file_writer.add_schema_metadata("foo", "bar");
786        file_writer.finish().await.unwrap();
787    }
788}