deltalake_core/operations/
vacuum.rsuse std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use chrono::{Duration, Utc};
use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
use object_store::Error;
use object_store::{path::Path, ObjectStore};
use serde::Serialize;
use super::transaction::{CommitBuilder, CommitProperties};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;
#[derive(thiserror::Error, Debug)]
enum VacuumError {
#[error(
"Invalid retention period, minimum retention for vacuum is configured to be greater than {} hours, got {} hours", .min, .provided
)]
InvalidVacuumRetentionPeriod {
provided: i64,
min: i64,
},
#[error(transparent)]
DeltaTable(#[from] DeltaTableError),
#[error(transparent)]
Protocol(#[from] crate::protocol::ProtocolError),
}
impl From<VacuumError> for DeltaTableError {
fn from(err: VacuumError) -> Self {
DeltaTableError::GenericError {
source: Box::new(err),
}
}
}
pub trait Clock: Debug + Send + Sync {
fn current_timestamp_millis(&self) -> i64;
}
#[derive(Debug)]
pub struct VacuumBuilder {
snapshot: DeltaTableState,
log_store: LogStoreRef,
retention_period: Option<Duration>,
enforce_retention_duration: bool,
dry_run: bool,
clock: Option<Arc<dyn Clock>>,
commit_properties: CommitProperties,
}
impl super::Operation<()> for VacuumBuilder {}
#[derive(Debug)]
pub struct VacuumMetrics {
pub dry_run: bool,
pub files_deleted: Vec<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VacuumStartOperationMetrics {
pub num_files_to_delete: i64,
pub size_of_data_to_delete: i64,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VacuumEndOperationMetrics {
pub num_deleted_files: i64,
pub num_vacuumed_directories: i64,
}
impl VacuumBuilder {
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
VacuumBuilder {
snapshot,
log_store,
retention_period: None,
enforce_retention_duration: true,
dry_run: false,
clock: None,
commit_properties: CommitProperties::default(),
}
}
pub fn with_retention_period(mut self, retention_period: Duration) -> Self {
self.retention_period = Some(retention_period);
self
}
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}
pub fn with_enforce_retention_duration(mut self, enforce: bool) -> Self {
self.enforce_retention_duration = enforce;
self
}
#[doc(hidden)]
pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
self.clock = Some(clock);
self
}
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
async fn create_vacuum_plan(&self) -> Result<VacuumPlan, VacuumError> {
let min_retention = Duration::milliseconds(
self.snapshot
.table_config()
.deleted_file_retention_duration()
.as_millis() as i64,
);
let retention_period = self.retention_period.unwrap_or(min_retention);
let enforce_retention_duration = self.enforce_retention_duration;
if enforce_retention_duration && retention_period < min_retention {
return Err(VacuumError::InvalidVacuumRetentionPeriod {
provided: retention_period.num_hours(),
min: min_retention.num_hours(),
});
}
let now_millis = match &self.clock {
Some(clock) => clock.current_timestamp_millis(),
None => Utc::now().timestamp_millis(),
};
let expired_tombstones = get_stale_files(
&self.snapshot,
retention_period,
now_millis,
self.log_store.object_store().clone(),
)
.await?;
let valid_files = self.snapshot.file_paths_iter().collect::<HashSet<Path>>();
let mut files_to_delete = vec![];
let mut file_sizes = vec![];
let object_store = self.log_store.object_store();
let mut all_files = object_store.list(None);
let partition_columns = &self.snapshot.metadata().partition_columns;
while let Some(obj_meta) = all_files.next().await {
let obj_meta = obj_meta.map_err(DeltaTableError::from)?;
if valid_files.contains(&obj_meta.location) || !expired_tombstones.contains(obj_meta.location.as_ref()) || is_hidden_directory(partition_columns, &obj_meta.location)?
{
continue;
}
files_to_delete.push(obj_meta.location);
file_sizes.push(obj_meta.size as i64);
}
Ok(VacuumPlan {
files_to_delete,
file_sizes,
retention_check_enabled: enforce_retention_duration,
default_retention_millis: min_retention.num_milliseconds(),
specified_retention_millis: Some(retention_period.num_milliseconds()),
})
}
}
impl std::future::IntoFuture for VacuumBuilder {
type Output = DeltaResult<(DeltaTable, VacuumMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
if !&this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("VACUUM".into()));
}
let plan = this.create_vacuum_plan().await?;
if this.dry_run {
return Ok((
DeltaTable::new_with_state(this.log_store, this.snapshot),
VacuumMetrics {
files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(),
dry_run: true,
},
));
}
let metrics = plan
.execute(
this.log_store.clone(),
&this.snapshot,
this.commit_properties,
)
.await?;
Ok((
DeltaTable::new_with_state(this.log_store, this.snapshot),
metrics,
))
})
}
}
struct VacuumPlan {
pub files_to_delete: Vec<Path>,
pub file_sizes: Vec<i64>,
pub retention_check_enabled: bool,
pub default_retention_millis: i64,
pub specified_retention_millis: Option<i64>,
}
impl VacuumPlan {
pub async fn execute(
self,
store: LogStoreRef,
snapshot: &DeltaTableState,
mut commit_properties: CommitProperties,
) -> Result<VacuumMetrics, DeltaTableError> {
if self.files_to_delete.is_empty() {
return Ok(VacuumMetrics {
dry_run: false,
files_deleted: Vec::new(),
});
}
let start_operation = DeltaOperation::VacuumStart {
retention_check_enabled: self.retention_check_enabled,
specified_retention_millis: self.specified_retention_millis,
default_retention_millis: self.default_retention_millis,
};
let end_operation = DeltaOperation::VacuumEnd {
status: String::from("COMPLETED"), };
let start_metrics = VacuumStartOperationMetrics {
num_files_to_delete: self.files_to_delete.len() as i64,
size_of_data_to_delete: self.file_sizes.iter().sum(),
};
let mut start_props = CommitProperties::default();
start_props.app_metadata = commit_properties.app_metadata.clone();
start_props.app_metadata.insert(
"operationMetrics".to_owned(),
serde_json::to_value(start_metrics)?,
);
CommitBuilder::from(start_props)
.build(Some(snapshot), store.clone(), start_operation)
.await?;
let locations = futures::stream::iter(self.files_to_delete)
.map(Result::Ok)
.boxed();
let files_deleted = store
.object_store()
.delete_stream(locations)
.map(|res| match res {
Ok(path) => Ok(path.to_string()),
Err(Error::NotFound { path, .. }) => Ok(path),
Err(err) => Err(err),
})
.try_collect::<Vec<_>>()
.await?;
let end_metrics = VacuumEndOperationMetrics {
num_deleted_files: files_deleted.len() as i64,
num_vacuumed_directories: 0, };
commit_properties.app_metadata.insert(
"operationMetrics".to_owned(),
serde_json::to_value(end_metrics)?,
);
CommitBuilder::from(commit_properties)
.build(Some(snapshot), store.clone(), end_operation)
.await?;
Ok(VacuumMetrics {
files_deleted,
dry_run: false,
})
}
}
fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result<bool, DeltaTableError> {
let path_name = path.to_string();
Ok((path_name.starts_with('.') || path_name.starts_with('_'))
&& !path_name.starts_with("_delta_index")
&& !path_name.starts_with("_change_data")
&& !partition_columns
.iter()
.any(|partition_column| path_name.starts_with(partition_column)))
}
async fn get_stale_files(
snapshot: &DeltaTableState,
retention_period: Duration,
now_timestamp_millis: i64,
store: Arc<dyn ObjectStore>,
) -> DeltaResult<HashSet<String>> {
let tombstone_retention_timestamp = now_timestamp_millis - retention_period.num_milliseconds();
Ok(snapshot
.all_tombstones(store)
.await?
.collect::<Vec<_>>()
.into_iter()
.filter(|tombstone| {
tombstone.deletion_timestamp.unwrap_or(0) < tombstone_retention_timestamp
})
.map(|tombstone| tombstone.path)
.collect::<HashSet<_>>())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::open_table;
use std::time::SystemTime;
#[tokio::test]
async fn vacuum_delta_8_0_table() {
let table = open_table("../test/tests/data/delta-0.8.0").await.unwrap();
let result = VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(1))
.with_dry_run(true)
.await;
assert!(result.is_err());
let table = open_table("../test/tests/data/delta-0.8.0").await.unwrap();
let (table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_enforce_retention_duration(false)
.await
.unwrap();
assert_eq!(
result.files_deleted,
vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
);
let (table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(169))
.with_dry_run(true)
.await
.unwrap();
assert_eq!(
result.files_deleted,
vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
);
let retention_hours = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
/ 3600;
let empty: Vec<String> = Vec::new();
let (_table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(retention_hours as i64))
.with_dry_run(true)
.await
.unwrap();
assert_eq!(result.files_deleted, empty);
}
}