use std::collections::HashMap;
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use bytes::Bytes;
use delta_kernel::expressions::Scalar;
use futures::{StreamExt, TryStreamExt};
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tracing::debug;
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, PartitionsExt};
use crate::storage::ObjectStoreRef;
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
ShareableBuffer,
};
const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600;
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
#[derive(thiserror::Error, Debug)]
enum WriteError {
#[error("Unexpected Arrow schema: got: {schema}, expected: {expected_schema}")]
SchemaMismatch {
schema: ArrowSchemaRef,
expected_schema: ArrowSchemaRef,
},
#[error("Error creating add action: {source}")]
CreateAdd {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Error handling Arrow data: {source}")]
Arrow {
#[from]
source: ArrowError,
},
#[error("Error partitioning record batch: {0}")]
Partitioning(String),
}
impl From<WriteError> for DeltaTableError {
fn from(err: WriteError) -> Self {
match err {
WriteError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch {
msg: err.to_string(),
},
WriteError::Arrow { source } => DeltaTableError::Arrow { source },
_ => DeltaTableError::GenericError {
source: Box::new(err),
},
}
}
}
#[derive(Debug)]
pub struct WriterConfig {
table_schema: ArrowSchemaRef,
partition_columns: Vec<String>,
writer_properties: WriterProperties,
target_file_size: usize,
write_batch_size: usize,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
}
impl WriterConfig {
pub fn new(
table_schema: ArrowSchemaRef,
partition_columns: Vec<String>,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
) -> Self {
let writer_properties = writer_properties.unwrap_or_else(|| {
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build()
});
let target_file_size = target_file_size.unwrap_or(DEFAULT_TARGET_FILE_SIZE);
let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
Self {
table_schema,
partition_columns,
writer_properties,
target_file_size,
write_batch_size,
num_indexed_cols,
stats_columns,
}
}
pub fn file_schema(&self) -> ArrowSchemaRef {
arrow_schema_without_partitions(&self.table_schema, &self.partition_columns)
}
}
#[derive(Debug)]
pub struct DeltaWriter {
object_store: ObjectStoreRef,
config: WriterConfig,
partition_writers: HashMap<Path, PartitionWriter>,
}
impl DeltaWriter {
pub fn new(object_store: ObjectStoreRef, config: WriterConfig) -> Self {
Self {
object_store,
config,
partition_writers: HashMap::new(),
}
}
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.config.writer_properties = writer_properties;
self
}
fn divide_by_partition_values(
&mut self,
values: &RecordBatch,
) -> DeltaResult<Vec<PartitionResult>> {
Ok(divide_by_partition_values(
self.config.file_schema(),
self.config.partition_columns.clone(),
values,
)
.map_err(|err| WriteError::Partitioning(err.to_string()))?)
}
pub async fn write_partition(
&mut self,
record_batch: RecordBatch,
partition_values: &IndexMap<String, Scalar>,
) -> DeltaResult<()> {
let partition_key = Path::parse(partition_values.hive_partition_path())?;
let record_batch =
record_batch_without_partitions(&record_batch, &self.config.partition_columns)?;
match self.partition_writers.get_mut(&partition_key) {
Some(writer) => {
writer.write(&record_batch).await?;
}
None => {
let config = PartitionWriterConfig::try_new(
self.config.file_schema(),
partition_values.clone(),
Some(self.config.writer_properties.clone()),
Some(self.config.target_file_size),
Some(self.config.write_batch_size),
)?;
let mut writer = PartitionWriter::try_with_config(
self.object_store.clone(),
config,
self.config.num_indexed_cols,
self.config.stats_columns.clone(),
)?;
writer.write(&record_batch).await?;
let _ = self.partition_writers.insert(partition_key, writer);
}
}
Ok(())
}
pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
for result in self.divide_by_partition_values(batch)? {
self.write_partition(result.record_batch, &result.partition_values)
.await?;
}
Ok(())
}
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
let writers = std::mem::take(&mut self.partition_writers);
let actions = futures::stream::iter(writers)
.map(|(_, writer)| async move {
let writer_actions = writer.close().await?;
Ok::<_, DeltaTableError>(writer_actions)
})
.buffered(num_cpus::get())
.try_fold(Vec::new(), |mut acc, actions| {
acc.extend(actions);
futures::future::ready(Ok(acc))
})
.await?;
Ok(actions)
}
}
#[derive(Debug)]
pub struct PartitionWriterConfig {
file_schema: ArrowSchemaRef,
prefix: Path,
partition_values: IndexMap<String, Scalar>,
writer_properties: WriterProperties,
target_file_size: usize,
write_batch_size: usize,
}
impl PartitionWriterConfig {
pub fn try_new(
file_schema: ArrowSchemaRef,
partition_values: IndexMap<String, Scalar>,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> DeltaResult<Self> {
let part_path = partition_values.hive_partition_path();
let prefix = Path::parse(part_path)?;
let writer_properties = writer_properties.unwrap_or_else(|| {
WriterProperties::builder()
.set_created_by(format!("delta-rs version {}", crate_version()))
.build()
});
let target_file_size = target_file_size.unwrap_or(DEFAULT_TARGET_FILE_SIZE);
let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
Ok(Self {
file_schema,
prefix,
partition_values,
writer_properties,
target_file_size,
write_batch_size,
})
}
}
#[derive(Debug)]
pub struct PartitionWriter {
object_store: ObjectStoreRef,
writer_id: uuid::Uuid,
config: PartitionWriterConfig,
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
part_counter: usize,
files_written: Vec<Add>,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
}
impl PartitionWriter {
pub fn try_with_config(
object_store: ObjectStoreRef,
config: PartitionWriterConfig,
num_indexed_cols: i32,
stats_columns: Option<Vec<String>>,
) -> DeltaResult<Self> {
let buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
buffer.clone(),
config.file_schema.clone(),
Some(config.writer_properties.clone()),
)?;
Ok(Self {
object_store,
writer_id: uuid::Uuid::new_v4(),
config,
buffer,
arrow_writer,
part_counter: 0,
files_written: Vec::new(),
num_indexed_cols,
stats_columns,
})
}
fn next_data_path(&mut self) -> Path {
self.part_counter += 1;
next_data_path(
&self.config.prefix,
self.part_counter,
&self.writer_id,
&self.config.writer_properties,
)
}
fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
let new_buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
new_buffer.clone(),
self.config.file_schema.clone(),
Some(self.config.writer_properties.clone()),
)?;
Ok((
std::mem::replace(&mut self.arrow_writer, arrow_writer),
std::mem::replace(&mut self.buffer, new_buffer),
))
}
fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
Ok(self.arrow_writer.write(batch)?)
}
async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
let (writer, buffer) = self.reset_writer()?;
let metadata = writer.close()?;
if metadata.num_rows == 0 {
return Ok(());
}
let buffer = match buffer.into_inner() {
Some(buffer) => Bytes::from(buffer),
None => return Ok(()), };
let path = self.next_data_path();
let file_size = buffer.len() as i64;
self.object_store.put(&path, buffer.into()).await?;
self.files_written.push(
create_add(
&self.config.partition_values,
path.to_string(),
file_size,
&metadata,
self.num_indexed_cols,
&self.stats_columns,
)
.map_err(|err| WriteError::CreateAdd {
source: Box::new(err),
})?,
);
Ok(())
}
pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
if batch.schema() != self.config.file_schema {
return Err(WriteError::SchemaMismatch {
schema: batch.schema(),
expected_schema: self.config.file_schema.clone(),
}
.into());
}
let max_offset = batch.num_rows();
for offset in (0..max_offset).step_by(self.config.write_batch_size) {
let length = usize::min(self.config.write_batch_size, max_offset - offset);
self.write_batch(&batch.slice(offset, length))?;
let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size();
if estimated_size >= self.config.target_file_size {
debug!(
"Writing file with estimated size {:?} to disk.",
estimated_size
);
self.flush_arrow_writer().await?;
}
}
Ok(())
}
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
self.flush_arrow_writer().await?;
Ok(self.files_written)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::utils::flatten_list_stream as list;
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
use crate::writer::test_utils::*;
use crate::DeltaTableBuilder;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use std::sync::Arc;
fn get_delta_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> DeltaWriter {
let config = WriterConfig::new(
batch.schema(),
vec![],
writer_properties,
target_file_size,
write_batch_size,
DEFAULT_NUM_INDEX_COLS,
None,
);
DeltaWriter::new(object_store, config)
}
fn get_partition_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
IndexMap::new(),
writer_properties,
target_file_size,
write_batch_size,
)
.unwrap();
PartitionWriter::try_with_config(object_store, config, DEFAULT_NUM_INDEX_COLS, None)
.unwrap()
}
#[tokio::test]
async fn test_write_partition() {
let log_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap();
let object_store = log_store.object_store();
let batch = get_record_batch(None, false);
let mut writer = get_partition_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);
let adds = writer.close().await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 1);
assert_eq!(files.len(), adds.len());
let head = object_store
.head(&Path::from(adds[0].path.clone()))
.await
.unwrap();
assert_eq!(head.size, adds[0].size as usize)
}
#[tokio::test]
async fn test_write_partition_with_parts() {
let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
let object_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap()
.object_store();
let properties = WriterProperties::builder()
.set_max_row_group_size(1024)
.build();
let mut writer =
get_partition_writer(object_store, &batch, Some(properties), Some(10_000), None);
writer.write(&batch).await.unwrap();
let adds = writer.close().await.unwrap();
assert!(adds.len() > 1);
let target_file_count = adds
.iter()
.fold(0, |acc, add| acc + (add.size > 10_000) as i32);
assert!(target_file_count >= adds.len() as i32 - 1)
}
#[tokio::test]
async fn test_unflushed_row_group_size() {
let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
let object_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap()
.object_store();
let mut writer = get_partition_writer(object_store, &batch, None, Some(10_000), None);
writer.write(&batch).await.unwrap();
let adds = writer.close().await.unwrap();
assert!(adds.len() > 1);
let target_file_count = adds
.iter()
.fold(0, |acc, add| acc + (add.size > 10_000) as i32);
assert!(target_file_count >= adds.len() as i32 - 1)
}
#[tokio::test]
async fn test_do_not_write_empty_file_on_close() {
let base_int = Arc::new(Int32Array::from((0..10000_i32).collect::<Vec<i32>>()));
let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
let object_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap()
.object_store();
let mut writer = get_partition_writer(object_store, &batch, None, Some(9000), Some(10000));
writer.write(&batch).await.unwrap();
let adds = writer.close().await.unwrap();
assert!(adds.len() == 1);
}
#[tokio::test]
async fn test_write_mismatched_schema() {
let log_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap();
let object_store = log_store.object_store();
let batch = get_record_batch(None, false);
let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);
let second_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]));
let second_batch = RecordBatch::try_new(
second_schema,
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2)])),
Arc::new(StringArray::from(vec![Some("will"), Some("robert")])),
],
)
.unwrap();
let result = writer.write(&second_batch).await;
assert!(result.is_err());
match result {
Ok(_) => {
panic!("Should not have successfully written");
}
Err(e) => {
match e {
DeltaTableError::SchemaMismatch { .. } => {
}
others => {
panic!("Got the wrong error: {others:?}");
}
}
}
};
}
}