1use core::panic;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use arrow_array::RecordBatch;
9
10use arrow_data::ArrayData;
11use bytes::{BufMut, Bytes, BytesMut};
12use futures::stream::FuturesOrdered;
13use futures::StreamExt;
14use lance_core::datatypes::{Field, Schema as LanceSchema};
15use lance_core::utils::bit::pad_bytes;
16use lance_core::{Error, Result};
17use lance_encoding::decoder::PageEncoding;
18use lance_encoding::encoder::{
19 default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
20 EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
21};
22use lance_encoding::repdef::RepDefBuilder;
23use lance_encoding::version::LanceFileVersion;
24use lance_io::object_store::ObjectStore;
25use lance_io::object_writer::ObjectWriter;
26use lance_io::traits::Writer;
27use log::debug;
28use object_store::path::Path;
29use prost::Message;
30use prost_types::Any;
31use snafu::location;
32use tokio::io::AsyncWriteExt;
33use tracing::instrument;
34
35use crate::datatypes::FieldsWithMeta;
36use crate::format::pb;
37use crate::format::pbfile;
38use crate::format::pbfile::DirectEncoding;
39use crate::format::MAGIC;
40
41pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
43const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
44
45#[derive(Debug, Clone, Default)]
46pub struct FileWriterOptions {
47 pub data_cache_bytes: Option<u64>,
62 pub max_page_bytes: Option<u64>,
68 pub keep_original_array: Option<bool>,
86 pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
87 pub format_version: Option<LanceFileVersion>,
93}
94
95pub struct FileWriter {
96 writer: ObjectWriter,
97 schema: Option<LanceSchema>,
98 column_writers: Vec<Box<dyn FieldEncoder>>,
99 column_metadata: Vec<pbfile::ColumnMetadata>,
100 field_id_to_column_indices: Vec<(u32, u32)>,
101 num_columns: u32,
102 rows_written: u64,
103 global_buffers: Vec<(u64, u64)>,
104 schema_metadata: HashMap<String, String>,
105 options: FileWriterOptions,
106}
107
108fn initial_column_metadata() -> pbfile::ColumnMetadata {
109 pbfile::ColumnMetadata {
110 pages: Vec::new(),
111 buffer_offsets: Vec::new(),
112 buffer_sizes: Vec::new(),
113 encoding: None,
114 }
115}
116
117impl FileWriter {
118 pub fn try_new(
120 object_writer: ObjectWriter,
121 schema: LanceSchema,
122 options: FileWriterOptions,
123 ) -> Result<Self> {
124 let mut writer = Self::new_lazy(object_writer, options);
125 writer.initialize(schema)?;
126 Ok(writer)
127 }
128
129 pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
134 Self {
135 writer: object_writer,
136 schema: None,
137 column_writers: Vec::new(),
138 column_metadata: Vec::new(),
139 num_columns: 0,
140 rows_written: 0,
141 field_id_to_column_indices: Vec::new(),
142 global_buffers: Vec::new(),
143 schema_metadata: HashMap::new(),
144 options,
145 }
146 }
147
148 pub async fn create_file_with_batches(
152 store: &ObjectStore,
153 path: &Path,
154 schema: lance_core::datatypes::Schema,
155 batches: impl Iterator<Item = RecordBatch> + Send,
156 options: FileWriterOptions,
157 ) -> Result<usize> {
158 let writer = store.create(path).await?;
159 let mut writer = Self::try_new(writer, schema, options)?;
160 for batch in batches {
161 writer.write_batch(&batch).await?;
162 }
163 Ok(writer.finish().await? as usize)
164 }
165
166 async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
167 writer.write_all(buf).await?;
168 let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
169 writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
170 Ok(())
171 }
172
173 pub fn version(&self) -> LanceFileVersion {
175 self.options.format_version.unwrap_or_default()
176 }
177
178 async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
179 let buffers = encoded_page.data;
180 let mut buffer_offsets = Vec::with_capacity(buffers.len());
181 let mut buffer_sizes = Vec::with_capacity(buffers.len());
182 for buffer in buffers {
183 buffer_offsets.push(self.writer.tell().await? as u64);
184 buffer_sizes.push(buffer.len() as u64);
185 Self::do_write_buffer(&mut self.writer, &buffer).await?;
186 }
187 let encoded_encoding = match encoded_page.description {
188 PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
189 PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
190 };
191 let page = pbfile::column_metadata::Page {
192 buffer_offsets,
193 buffer_sizes,
194 encoding: Some(pbfile::Encoding {
195 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
196 encoding: encoded_encoding,
197 })),
198 }),
199 length: encoded_page.num_rows,
200 priority: encoded_page.row_number,
201 };
202 self.column_metadata[encoded_page.column_idx as usize]
203 .pages
204 .push(page);
205 Ok(())
206 }
207
208 #[instrument(skip_all, level = "debug")]
209 async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
210 while let Some(encoding_task) = encoding_tasks.next().await {
219 let encoded_page = encoding_task?;
220 self.write_page(encoded_page).await?;
221 }
222 self.writer.flush().await?;
227 Ok(())
228 }
229
230 pub async fn write_batches(
232 &mut self,
233 batches: impl Iterator<Item = &RecordBatch>,
234 ) -> Result<()> {
235 for batch in batches {
236 self.write_batch(batch).await?;
237 }
238 Ok(())
239 }
240
241 fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
242 if !field.nullable && arr.null_count() > 0 {
243 return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
244 }
245
246 for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
247 Self::verify_field_nullability(child_arr, child_field)?;
248 }
249
250 Ok(())
251 }
252
253 fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
254 for (col, field) in batch
255 .columns()
256 .iter()
257 .zip(self.schema.as_ref().unwrap().fields.iter())
258 {
259 Self::verify_field_nullability(&col.to_data(), field)?;
260 }
261 Ok(())
262 }
263
264 fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
265 let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
266 data_cache_bytes / schema.fields.len() as u64
267 } else {
268 8 * 1024 * 1024
269 };
270
271 let max_page_bytes = self.options.max_page_bytes.unwrap_or(32 * 1024 * 1024);
272
273 schema.validate()?;
274
275 let keep_original_array = self.options.keep_original_array.unwrap_or(false);
276 let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
277 let version = self.version();
278 default_encoding_strategy(version).into()
279 });
280
281 let encoding_options = EncodingOptions {
282 cache_bytes_per_column,
283 max_page_bytes,
284 keep_original_array,
285 buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
286 };
287 let encoder =
288 BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
289 self.num_columns = encoder.num_columns();
290
291 self.column_writers = encoder.field_encoders;
292 self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
293 self.field_id_to_column_indices = encoder.field_id_to_column_index;
294 self.schema_metadata
295 .extend(std::mem::take(&mut schema.metadata));
296 self.schema = Some(schema);
297 Ok(())
298 }
299
300 fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
301 if self.schema.is_none() {
302 let schema = LanceSchema::try_from(batch.schema().as_ref())?;
303 self.initialize(schema)?;
304 }
305 Ok(self.schema.as_ref().unwrap())
306 }
307
308 #[instrument(skip_all, level = "debug")]
309 fn encode_batch(
310 &mut self,
311 batch: &RecordBatch,
312 external_buffers: &mut OutOfLineBuffers,
313 ) -> Result<Vec<Vec<EncodeTask>>> {
314 self.schema
315 .as_ref()
316 .unwrap()
317 .fields
318 .iter()
319 .zip(self.column_writers.iter_mut())
320 .map(|(field, column_writer)| {
321 let array = batch
322 .column_by_name(&field.name)
323 .ok_or(Error::InvalidInput {
324 source: format!(
325 "Cannot write batch. The batch was missing the column `{}`",
326 field.name
327 )
328 .into(),
329 location: location!(),
330 })?;
331 let repdef = RepDefBuilder::default();
332 let num_rows = array.len() as u64;
333 column_writer.maybe_encode(
334 array.clone(),
335 external_buffers,
336 repdef,
337 self.rows_written,
338 num_rows,
339 )
340 })
341 .collect::<Result<Vec<_>>>()
342 }
343
344 pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
349 debug!(
350 "write_batch called with {} bytes of data",
351 batch.get_array_memory_size()
352 );
353 self.ensure_initialized(batch)?;
354 self.verify_nullability_constraints(batch)?;
355 let num_rows = batch.num_rows() as u64;
356 if num_rows == 0 {
357 return Ok(());
358 }
359 if num_rows > u32::MAX as u64 {
360 return Err(Error::InvalidInput {
361 source: "cannot write Lance files with more than 2^32 rows".into(),
362 location: location!(),
363 });
364 }
365 let mut external_buffers =
368 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
369 let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
370 for external_buffer in external_buffers.take_buffers() {
372 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
373 }
374
375 let encoding_tasks = encoding_tasks
376 .into_iter()
377 .flatten()
378 .collect::<FuturesOrdered<_>>();
379
380 self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
381 Some(rows_written) => rows_written,
382 None => {
383 return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
384 }
385 };
386
387 self.write_pages(encoding_tasks).await?;
388
389 Ok(())
390 }
391
392 async fn write_column_metadata(
393 &mut self,
394 metadata: pbfile::ColumnMetadata,
395 ) -> Result<(u64, u64)> {
396 let metadata_bytes = metadata.encode_to_vec();
397 let position = self.writer.tell().await? as u64;
398 let len = metadata_bytes.len() as u64;
399 self.writer.write_all(&metadata_bytes).await?;
400 Ok((position, len))
401 }
402
403 async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
404 let mut metadatas = Vec::new();
405 std::mem::swap(&mut self.column_metadata, &mut metadatas);
406 let mut metadata_positions = Vec::with_capacity(metadatas.len());
407 for metadata in metadatas {
408 metadata_positions.push(self.write_column_metadata(metadata).await?);
409 }
410 Ok(metadata_positions)
411 }
412
413 fn make_file_descriptor(
414 schema: &lance_core::datatypes::Schema,
415 num_rows: u64,
416 ) -> Result<pb::FileDescriptor> {
417 let fields_with_meta = FieldsWithMeta::from(schema);
418 Ok(pb::FileDescriptor {
419 schema: Some(pb::Schema {
420 fields: fields_with_meta.fields.0,
421 metadata: fields_with_meta.metadata,
422 }),
423 length: num_rows,
424 })
425 }
426
427 async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
428 let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created", location!()))?;
429 schema.metadata = std::mem::take(&mut self.schema_metadata);
430 let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
431 let file_descriptor_bytes = file_descriptor.encode_to_vec();
432 let file_descriptor_len = file_descriptor_bytes.len() as u64;
433 let file_descriptor_position = self.writer.tell().await? as u64;
434 self.writer.write_all(&file_descriptor_bytes).await?;
435 let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
436 gbo_table.push((file_descriptor_position, file_descriptor_len));
437 gbo_table.append(&mut self.global_buffers);
438 Ok(gbo_table)
439 }
440
441 pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
447 self.schema_metadata.insert(key.into(), value.into());
448 }
449
450 pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
456 let position = self.writer.tell().await? as u64;
457 let len = buffer.len() as u64;
458 Self::do_write_buffer(&mut self.writer, &buffer).await?;
459 self.global_buffers.push((position, len));
460 Ok(self.global_buffers.len() as u32)
461 }
462
463 async fn finish_writers(&mut self) -> Result<()> {
464 let mut col_idx = 0;
465 for mut writer in std::mem::take(&mut self.column_writers) {
466 let mut external_buffers =
467 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
468 let columns = writer.finish(&mut external_buffers).await?;
469 for buffer in external_buffers.take_buffers() {
470 self.writer.write_all(&buffer).await?;
471 }
472 debug_assert_eq!(
473 columns.len(),
474 writer.num_columns() as usize,
475 "Expected {} columns from column at index {} and got {}",
476 writer.num_columns(),
477 col_idx,
478 columns.len()
479 );
480 for column in columns {
481 for page in column.final_pages {
482 self.write_page(page).await?;
483 }
484 let column_metadata = &mut self.column_metadata[col_idx];
485 let mut buffer_pos = self.writer.tell().await? as u64;
486 for buffer in column.column_buffers {
487 column_metadata.buffer_offsets.push(buffer_pos);
488 let mut size = 0;
489 Self::do_write_buffer(&mut self.writer, &buffer).await?;
490 size += buffer.len() as u64;
491 buffer_pos += size;
492 column_metadata.buffer_sizes.push(size);
493 }
494 let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
495 column_metadata.encoding = Some(pbfile::Encoding {
496 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
497 encoding: encoded_encoding,
498 })),
499 });
500 col_idx += 1;
501 }
502 }
503 if col_idx != self.column_metadata.len() {
504 panic!(
505 "Column writers finished with {} columns but we expected {}",
506 col_idx,
507 self.column_metadata.len()
508 );
509 }
510 Ok(())
511 }
512
513 fn version_to_numbers(&self) -> (u16, u16) {
516 let version = self.options.format_version.unwrap_or_default();
517 match version.resolve() {
518 LanceFileVersion::V2_0 => (0, 3),
519 LanceFileVersion::V2_1 => (2, 1),
520 _ => panic!("Unsupported version: {}", version),
521 }
522 }
523
524 pub async fn finish(&mut self) -> Result<u64> {
532 let mut external_buffers =
534 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
535 let encoding_tasks = self
536 .column_writers
537 .iter_mut()
538 .map(|writer| writer.flush(&mut external_buffers))
539 .collect::<Result<Vec<_>>>()?;
540 for external_buffer in external_buffers.take_buffers() {
541 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
542 }
543 let encoding_tasks = encoding_tasks
544 .into_iter()
545 .flatten()
546 .collect::<FuturesOrdered<_>>();
547 self.write_pages(encoding_tasks).await?;
548
549 self.finish_writers().await?;
550
551 let global_buffer_offsets = self.write_global_buffers().await?;
553 let num_global_buffers = global_buffer_offsets.len() as u32;
554
555 let column_metadata_start = self.writer.tell().await? as u64;
557 let metadata_positions = self.write_column_metadatas().await?;
558
559 let cmo_table_start = self.writer.tell().await? as u64;
561 for (meta_pos, meta_len) in metadata_positions {
562 self.writer.write_u64_le(meta_pos).await?;
563 self.writer.write_u64_le(meta_len).await?;
564 }
565
566 let gbo_table_start = self.writer.tell().await? as u64;
568 for (gbo_pos, gbo_len) in global_buffer_offsets {
569 self.writer.write_u64_le(gbo_pos).await?;
570 self.writer.write_u64_le(gbo_len).await?;
571 }
572
573 let (major, minor) = self.version_to_numbers();
574 self.writer.write_u64_le(column_metadata_start).await?;
576 self.writer.write_u64_le(cmo_table_start).await?;
577 self.writer.write_u64_le(gbo_table_start).await?;
578 self.writer.write_u32_le(num_global_buffers).await?;
579 self.writer.write_u32_le(self.num_columns).await?;
580 self.writer.write_u16_le(major).await?;
581 self.writer.write_u16_le(minor).await?;
582 self.writer.write_all(MAGIC).await?;
583
584 self.writer.shutdown().await?;
586 Ok(self.rows_written)
587 }
588
589 pub async fn tell(&mut self) -> Result<u64> {
590 Ok(self.writer.tell().await? as u64)
591 }
592
593 pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
594 &self.field_id_to_column_indices
595 }
596}
597
598pub trait EncodedBatchWriteExt {
601 fn try_to_self_described_lance(&self) -> Result<Bytes>;
603 fn try_to_mini_lance(&self) -> Result<Bytes>;
607}
608
609fn concat_lance_footer(batch: &EncodedBatch, write_schema: bool) -> Result<Bytes> {
614 let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
616 data.put(batch.data.clone());
617 let global_buffers = if write_schema {
619 let schema_start = data.len() as u64;
620 let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
621 let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
622 let descriptor_bytes = descriptor.encode_to_vec();
623 let descriptor_len = descriptor_bytes.len() as u64;
624 data.put(descriptor_bytes.as_slice());
625
626 vec![(schema_start, descriptor_len)]
627 } else {
628 vec![]
629 };
630 let col_metadata_start = data.len() as u64;
631
632 let mut col_metadata_positions = Vec::new();
633 for col in &batch.page_table {
635 let position = data.len() as u64;
636 let pages = col
637 .page_infos
638 .iter()
639 .map(|page_info| {
640 let encoded_encoding = match &page_info.encoding {
641 PageEncoding::Legacy(array_encoding) => {
642 Any::from_msg(array_encoding)?.encode_to_vec()
643 }
644 PageEncoding::Structural(page_layout) => {
645 Any::from_msg(page_layout)?.encode_to_vec()
646 }
647 };
648 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
649 .buffer_offsets_and_sizes
650 .as_ref()
651 .iter()
652 .cloned()
653 .unzip();
654 Ok(pbfile::column_metadata::Page {
655 buffer_offsets,
656 buffer_sizes,
657 encoding: Some(pbfile::Encoding {
658 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
659 encoding: encoded_encoding,
660 })),
661 }),
662 length: page_info.num_rows,
663 priority: page_info.priority,
664 })
665 })
666 .collect::<Result<Vec<_>>>()?;
667 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
668 col.buffer_offsets_and_sizes.iter().cloned().unzip();
669 let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
670 let column = pbfile::ColumnMetadata {
671 pages,
672 buffer_offsets,
673 buffer_sizes,
674 encoding: Some(pbfile::Encoding {
675 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
676 encoding: encoded_col_encoding,
677 })),
678 }),
679 };
680 let column_bytes = column.encode_to_vec();
681 col_metadata_positions.push((position, column_bytes.len() as u64));
682 data.put(column_bytes.as_slice());
683 }
684 let cmo_table_start = data.len() as u64;
686 for (meta_pos, meta_len) in col_metadata_positions {
687 data.put_u64_le(meta_pos);
688 data.put_u64_le(meta_len);
689 }
690 let gbo_table_start = data.len() as u64;
692 let num_global_buffers = global_buffers.len() as u32;
693 for (gbo_pos, gbo_len) in global_buffers {
694 data.put_u64_le(gbo_pos);
695 data.put_u64_le(gbo_len);
696 }
697
698 let (major, minor) = LanceFileVersion::default().to_numbers();
699
700 data.put_u64_le(col_metadata_start);
702 data.put_u64_le(cmo_table_start);
703 data.put_u64_le(gbo_table_start);
704 data.put_u32_le(num_global_buffers);
705 data.put_u32_le(batch.page_table.len() as u32);
706 data.put_u16_le(major as u16);
707 data.put_u16_le(minor as u16);
708 data.put(MAGIC.as_slice());
709
710 Ok(data.freeze())
711}
712
713impl EncodedBatchWriteExt for EncodedBatch {
714 fn try_to_self_described_lance(&self) -> Result<Bytes> {
715 concat_lance_footer(self, true)
716 }
717
718 fn try_to_mini_lance(&self) -> Result<Bytes> {
719 concat_lance_footer(self, false)
720 }
721}
722
723#[cfg(test)]
724mod tests {
725 use std::sync::Arc;
726
727 use arrow_array::{types::Float64Type, RecordBatchReader};
728 use lance_datagen::{array, gen, BatchCount, RowCount};
729 use lance_io::object_store::ObjectStore;
730 use object_store::path::Path;
731
732 use crate::v2::writer::{FileWriter, FileWriterOptions};
733
734 #[tokio::test]
735 async fn test_basic_write() {
736 let tmp_dir = tempfile::tempdir().unwrap();
737 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
738 let tmp_path = Path::parse(tmp_path).unwrap();
739 let tmp_path = tmp_path.child("some_file.lance");
740 let obj_store = Arc::new(ObjectStore::local());
741
742 let reader = gen()
743 .col("score", array::rand::<Float64Type>())
744 .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
745
746 let writer = obj_store.create(&tmp_path).await.unwrap();
747
748 let lance_schema =
749 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
750
751 let mut file_writer =
752 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
753
754 for batch in reader {
755 file_writer.write_batch(&batch.unwrap()).await.unwrap();
756 }
757 file_writer.add_schema_metadata("foo", "bar");
758 file_writer.finish().await.unwrap();
759 }
761
762 #[tokio::test]
763 async fn test_write_empty() {
764 let tmp_dir = tempfile::tempdir().unwrap();
765 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
766 let tmp_path = Path::parse(tmp_path).unwrap();
767 let tmp_path = tmp_path.child("some_file.lance");
768 let obj_store = Arc::new(ObjectStore::local());
769
770 let reader = gen()
771 .col("score", array::rand::<Float64Type>())
772 .into_reader_rows(RowCount::from(0), BatchCount::from(0));
773
774 let writer = obj_store.create(&tmp_path).await.unwrap();
775
776 let lance_schema =
777 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
778
779 let mut file_writer =
780 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
781
782 for batch in reader {
783 file_writer.write_batch(&batch.unwrap()).await.unwrap();
784 }
785 file_writer.add_schema_metadata("foo", "bar");
786 file_writer.finish().await.unwrap();
787 }
788}