deltalake_core/writer/
mod.rsuse arrow::{datatypes::SchemaRef, error::ArrowError};
use async_trait::async_trait;
use object_store::Error as ObjectStoreError;
use parquet::errors::ParquetError;
use serde_json::Value;
use crate::errors::DeltaTableError;
use crate::kernel::{Action, Add};
use crate::operations::transaction::{CommitBuilder, CommitProperties};
use crate::protocol::{ColumnCountStat, DeltaOperation, SaveMode};
use crate::DeltaTable;
pub use json::JsonWriter;
pub use record_batch::RecordBatchWriter;
pub use stats::create_add;
pub mod json;
pub mod record_batch;
pub(crate) mod stats;
pub mod utils;
#[cfg(test)]
pub mod test_utils;
#[derive(thiserror::Error, Debug)]
pub(crate) enum DeltaWriterError {
#[error("Missing partition column: {0}")]
MissingPartitionColumn(String),
#[error("Arrow RecordBatch schema does not match: RecordBatch schema: {record_batch_schema}, {expected_schema}")]
SchemaMismatch {
record_batch_schema: SchemaRef,
expected_schema: SchemaRef,
},
#[error("Arrow RecordBatch created from JSON buffer is a None value")]
EmptyRecordBatch,
#[error("Record {0} is not a JSON object")]
InvalidRecord(String),
#[error("Failed to write some values to parquet. Sample error: {sample_error}.")]
PartialParquetWrite {
skipped_values: Vec<(Value, ParquetError)>,
sample_error: ParquetError,
},
#[error("Failed to write statistics value {debug_value} with logical type {logical_type:?}")]
StatsParsingFailed {
debug_value: String,
logical_type: Option<parquet::basic::LogicalType>,
},
#[error("Failed to serialize data to JSON: {source}")]
JSONSerializationFailed {
#[from]
source: serde_json::Error,
},
#[error("ObjectStore interaction failed: {source}")]
ObjectStore {
#[from]
source: ObjectStoreError,
},
#[error("Arrow interaction failed: {source}")]
Arrow {
#[from]
source: ArrowError,
},
#[error("Parquet write failed: {source}")]
Parquet {
#[from]
source: ParquetError,
},
#[error("std::io::Error: {source}")]
Io {
#[from]
source: std::io::Error,
},
#[error(transparent)]
DeltaTable(#[from] DeltaTableError),
}
impl From<DeltaWriterError> for DeltaTableError {
fn from(err: DeltaWriterError) -> Self {
match err {
DeltaWriterError::Arrow { source } => DeltaTableError::Arrow { source },
DeltaWriterError::Io { source } => DeltaTableError::Io { source },
DeltaWriterError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
DeltaWriterError::Parquet { source } => DeltaTableError::Parquet { source },
DeltaWriterError::DeltaTable(e) => e,
DeltaWriterError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch {
msg: err.to_string(),
},
_ => DeltaTableError::Generic(err.to_string()),
}
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum WriteMode {
Default,
MergeSchema,
}
#[async_trait]
pub trait DeltaWriter<T> {
async fn write(&mut self, values: T) -> Result<(), DeltaTableError>;
async fn write_with_mode(&mut self, values: T, mode: WriteMode) -> Result<(), DeltaTableError>;
async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError>;
async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result<i64, DeltaTableError> {
let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect();
flush_and_commit(adds, table).await
}
}
pub(crate) async fn flush_and_commit(
adds: Vec<Action>,
table: &mut DeltaTable,
) -> Result<i64, DeltaTableError> {
let snapshot = table.snapshot()?;
let partition_cols = snapshot.metadata().partition_columns.clone();
let partition_by = if !partition_cols.is_empty() {
Some(partition_cols)
} else {
None
};
let operation = DeltaOperation::Write {
mode: SaveMode::Append,
partition_by,
predicate: None,
};
let version = CommitBuilder::from(CommitProperties::default())
.with_actions(adds)
.build(Some(snapshot), table.log_store.clone(), operation)
.await?
.version();
table.update().await?;
Ok(version)
}