#![allow(non_camel_case_types)]
use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::mem::take;
use std::str::FromStr;
use arrow_schema::ArrowError;
use futures::StreamExt;
use lazy_static::lazy_static;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::{debug, error};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField, TableFeatures};
use crate::logstore::LogStore;
use crate::table::CheckPoint;
pub mod checkpoints;
mod parquet_read;
mod time_utils;
#[allow(missing_docs)]
#[derive(thiserror::Error, Debug)]
pub enum ProtocolError {
#[error("Table state does not contain metadata")]
NoMetaData,
#[error("Checkpoint file not found")]
CheckpointNotFound,
#[error("End of transaction log")]
EndOfLog,
#[error("Invalid action field: {0}")]
InvalidField(String),
#[error("Invalid action in parquet row: {0}")]
InvalidRow(String),
#[error("Invalid deletion vector storage type: {0}")]
InvalidDeletionVectorStorageType(String),
#[error("Generic action error: {0}")]
Generic(String),
#[error("Failed to parse parquet checkpoint: {source}")]
ParquetParseError {
#[from]
source: parquet::errors::ParquetError,
},
#[error("Failed to serialize operation: {source}")]
SerializeOperation {
#[from]
source: serde_json::Error,
},
#[error("Failed to convert into Arrow schema: {}", .source)]
Arrow {
#[from]
source: ArrowError,
},
#[error("ObjectStoreError: {source}")]
ObjectStore {
#[from]
source: ObjectStoreError,
},
#[error("Io: {source}")]
IO {
#[from]
source: std::io::Error,
},
#[error("Kernel: {source}")]
Kernel {
#[from]
source: crate::kernel::Error,
},
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum ColumnValueStat {
Column(HashMap<String, ColumnValueStat>),
Value(Value),
}
impl ColumnValueStat {
pub fn as_column(&self) -> Option<&HashMap<String, ColumnValueStat>> {
match self {
ColumnValueStat::Column(m) => Some(m),
_ => None,
}
}
pub fn as_value(&self) -> Option<&Value> {
match self {
ColumnValueStat::Value(v) => Some(v),
_ => None,
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum ColumnCountStat {
Column(HashMap<String, ColumnCountStat>),
Value(i64),
}
impl ColumnCountStat {
pub fn as_column(&self) -> Option<&HashMap<String, ColumnCountStat>> {
match self {
ColumnCountStat::Column(m) => Some(m),
_ => None,
}
}
pub fn as_value(&self) -> Option<i64> {
match self {
ColumnCountStat::Value(v) => Some(*v),
_ => None,
}
}
}
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
pub num_records: i64,
pub min_values: HashMap<String, ColumnValueStat>,
pub max_values: HashMap<String, ColumnValueStat>,
pub null_count: HashMap<String, ColumnCountStat>,
}
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
struct PartialStats {
pub num_records: i64,
pub min_values: Option<HashMap<String, ColumnValueStat>>,
pub max_values: Option<HashMap<String, ColumnValueStat>>,
pub null_count: Option<HashMap<String, ColumnCountStat>>,
}
impl PartialStats {
pub fn as_stats(&mut self) -> Stats {
let min_values = take(&mut self.min_values);
let max_values = take(&mut self.max_values);
let null_count = take(&mut self.null_count);
Stats {
num_records: self.num_records,
min_values: min_values.unwrap_or_default(),
max_values: max_values.unwrap_or_default(),
null_count: null_count.unwrap_or_default(),
}
}
}
#[derive(Debug, Default)]
pub struct StatsParsed {
pub num_records: i64,
pub min_values: HashMap<String, parquet::record::Field>,
pub max_values: HashMap<String, parquet::record::Field>,
pub null_count: HashMap<String, i64>,
}
impl Hash for Add {
fn hash<H: Hasher>(&self, state: &mut H) {
self.path.hash(state);
}
}
impl PartialEq for Add {
fn eq(&self, other: &Self) -> bool {
self.path == other.path
&& self.size == other.size
&& self.partition_values == other.partition_values
&& self.modification_time == other.modification_time
&& self.data_change == other.data_change
&& self.stats == other.stats
&& self.tags == other.tags
&& self.deletion_vector == other.deletion_vector
}
}
impl Eq for Add {}
impl Add {
pub(crate) fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
match self.get_stats_parsed() {
Ok(Some(stats)) => Ok(Some(stats)),
Ok(None) => self.get_json_stats(),
Err(e) => {
error!(
"Error when reading parquet stats {:?} {e}. Attempting to read json stats",
self.stats_parsed
);
self.get_json_stats()
}
}
}
pub fn get_json_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
self.stats
.as_ref()
.map(|stats| serde_json::from_str(stats).map(|mut ps: PartialStats| ps.as_stats()))
.transpose()
}
}
impl Hash for Remove {
fn hash<H: Hasher>(&self, state: &mut H) {
self.path.hash(state);
}
}
impl Borrow<str> for Remove {
fn borrow(&self) -> &str {
self.path.as_ref()
}
}
impl PartialEq for Remove {
fn eq(&self, other: &Self) -> bool {
self.path == other.path
&& self.deletion_timestamp == other.deletion_timestamp
&& self.data_change == other.data_change
&& self.extended_file_metadata == other.extended_file_metadata
&& self.partition_values == other.partition_values
&& self.size == other.size
&& self.tags == other.tags
&& self.deletion_vector == other.deletion_vector
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct MergePredicate {
pub action_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate: Option<String>,
}
#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub enum DeltaOperation {
AddColumn {
fields: Vec<StructField>,
},
Create {
mode: SaveMode,
location: String,
protocol: Protocol,
metadata: Metadata,
},
#[serde(rename_all = "camelCase")]
Write {
mode: SaveMode,
partition_by: Option<Vec<String>>,
predicate: Option<String>,
},
Delete {
predicate: Option<String>,
},
Update {
predicate: Option<String>,
},
AddConstraint {
name: String,
expr: String,
},
AddFeature {
name: Vec<TableFeatures>,
},
DropConstraint {
name: String,
},
#[serde(rename_all = "camelCase")]
Merge {
predicate: Option<String>,
merge_predicate: Option<String>,
matched_predicates: Vec<MergePredicate>,
not_matched_predicates: Vec<MergePredicate>,
not_matched_by_source_predicates: Vec<MergePredicate>,
},
#[serde(rename_all = "camelCase")]
StreamingUpdate {
output_mode: OutputMode,
query_id: String,
epoch_id: i64,
},
#[serde(rename_all = "camelCase")]
SetTableProperties {
properties: HashMap<String, String>,
},
#[serde(rename_all = "camelCase")]
Optimize {
predicate: Option<String>,
target_size: i64,
},
#[serde(rename_all = "camelCase")]
FileSystemCheck {},
Restore {
version: Option<i64>,
datetime: Option<i64>,
}, #[serde(rename_all = "camelCase")]
VacuumStart {
retention_check_enabled: bool,
specified_retention_millis: Option<i64>,
default_retention_millis: i64,
},
VacuumEnd {
status: String,
},
}
impl DeltaOperation {
pub fn name(&self) -> &str {
match &self {
DeltaOperation::AddColumn { .. } => "ADD COLUMN",
DeltaOperation::Create {
mode: SaveMode::Overwrite,
..
} => "CREATE OR REPLACE TABLE",
DeltaOperation::Create { .. } => "CREATE TABLE",
DeltaOperation::Write { .. } => "WRITE",
DeltaOperation::Delete { .. } => "DELETE",
DeltaOperation::Update { .. } => "UPDATE",
DeltaOperation::Merge { .. } => "MERGE",
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
DeltaOperation::SetTableProperties { .. } => "SET TBLPROPERTIES",
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
DeltaOperation::Restore { .. } => "RESTORE",
DeltaOperation::VacuumStart { .. } => "VACUUM START",
DeltaOperation::VacuumEnd { .. } => "VACUUM END",
DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT",
DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT",
DeltaOperation::AddFeature { .. } => "ADD FEATURE",
}
}
pub fn operation_parameters(&self) -> DeltaResult<HashMap<String, Value>> {
if let Some(Some(Some(map))) = serde_json::to_value(self)
.map_err(|err| ProtocolError::SerializeOperation { source: err })?
.as_object()
.map(|p| p.values().next().map(|q| q.as_object()))
{
Ok(map
.iter()
.filter(|item| !item.1.is_null())
.map(|(k, v)| {
(
k.to_owned(),
serde_json::Value::String(if v.is_string() {
String::from(v.as_str().unwrap())
} else {
v.to_string()
}),
)
})
.collect())
} else {
Err(ProtocolError::Generic(
"Operation parameters serialized into unexpected shape".into(),
)
.into())
}
}
pub fn changes_data(&self) -> bool {
match self {
Self::Optimize { .. }
| Self::SetTableProperties { .. }
| Self::AddColumn { .. }
| Self::AddFeature { .. }
| Self::VacuumStart { .. }
| Self::VacuumEnd { .. }
| Self::AddConstraint { .. }
| Self::DropConstraint { .. } => false,
Self::Create { .. }
| Self::FileSystemCheck {}
| Self::StreamingUpdate { .. }
| Self::Write { .. }
| Self::Delete { .. }
| Self::Merge { .. }
| Self::Update { .. }
| Self::Restore { .. } => true,
}
}
pub fn get_commit_info(&self) -> CommitInfo {
CommitInfo {
operation: Some(self.name().into()),
operation_parameters: self.operation_parameters().ok(),
..Default::default()
}
}
pub fn read_predicate(&self) -> Option<String> {
match self {
Self::Write { predicate, .. } => predicate.clone(),
Self::Delete { predicate, .. } => predicate.clone(),
Self::Update { predicate, .. } => predicate.clone(),
Self::Merge { predicate, .. } => predicate.clone(),
_ => None,
}
}
pub fn read_whole_table(&self) -> bool {
match self {
Self::Merge { predicate, .. } if predicate.is_none() => true,
_ => false,
}
}
}
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
pub enum SaveMode {
Append,
Overwrite,
ErrorIfExists,
Ignore,
}
impl FromStr for SaveMode {
type Err = DeltaTableError;
fn from_str(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"append" => Ok(SaveMode::Append),
"overwrite" => Ok(SaveMode::Overwrite),
"error" => Ok(SaveMode::ErrorIfExists),
"ignore" => Ok(SaveMode::Ignore),
_ => Err(DeltaTableError::Generic(format!(
"Invalid save mode provided: {}, only these are supported: ['append', 'overwrite', 'error', 'ignore']",
s
))),
}
}
}
#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
pub enum OutputMode {
Append,
Complete,
Update,
}
pub(crate) async fn get_last_checkpoint(
log_store: &dyn LogStore,
) -> Result<CheckPoint, ProtocolError> {
let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]);
debug!("loading checkpoint from {last_checkpoint_path}");
match log_store.object_store().get(&last_checkpoint_path).await {
Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?),
Err(ObjectStoreError::NotFound { .. }) => {
match find_latest_check_point_for_version(log_store, i64::MAX).await {
Ok(Some(cp)) => Ok(cp),
_ => Err(ProtocolError::CheckpointNotFound),
}
}
Err(err) => Err(ProtocolError::ObjectStore { source: err }),
}
}
pub(crate) async fn find_latest_check_point_for_version(
log_store: &dyn LogStore,
version: i64,
) -> Result<Option<CheckPoint>, ProtocolError> {
lazy_static! {
static ref CHECKPOINT_REGEX: Regex =
Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.parquet$").unwrap();
static ref CHECKPOINT_PARTS_REGEX: Regex =
Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$").unwrap();
}
let mut cp: Option<CheckPoint> = None;
let object_store = log_store.object_store();
let mut stream = object_store.list(Some(log_store.log_path()));
while let Some(obj_meta) = stream.next().await {
let obj_meta = match obj_meta {
Ok(meta) => Ok(meta),
Err(ObjectStoreError::NotFound { .. }) => continue,
Err(err) => Err(err),
}?;
if let Some(captures) = CHECKPOINT_REGEX.captures(obj_meta.location.as_ref()) {
let curr_ver_str = captures.get(1).unwrap().as_str();
let curr_ver: i64 = curr_ver_str.parse().unwrap();
if curr_ver > version {
continue;
}
if cp.is_none() || curr_ver > cp.unwrap().version {
cp = Some(CheckPoint::new(curr_ver, 0, None));
}
continue;
}
if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(obj_meta.location.as_ref()) {
let curr_ver_str = captures.get(1).unwrap().as_str();
let curr_ver: i64 = curr_ver_str.parse().unwrap();
if curr_ver > version {
continue;
}
if cp.is_none() || curr_ver > cp.unwrap().version {
let parts_str = captures.get(2).unwrap().as_str();
let parts = parts_str.parse().unwrap();
cp = Some(CheckPoint::new(curr_ver, 0, Some(parts)));
}
continue;
}
}
Ok(cp)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::Action;
#[test]
fn test_load_table_stats() {
let action = Add {
stats: Some(
serde_json::json!({
"numRecords": 22,
"minValues": {"a": 1, "nested": {"b": 2, "c": "a"}},
"maxValues": {"a": 10, "nested": {"b": 20, "c": "z"}},
"nullCount": {"a": 1, "nested": {"b": 0, "c": 1}},
})
.to_string(),
),
path: Default::default(),
data_change: true,
deletion_vector: None,
partition_values: Default::default(),
stats_parsed: None,
tags: None,
size: 0,
modification_time: 0,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};
let stats = action.get_stats().unwrap().unwrap();
assert_eq!(stats.num_records, 22);
assert_eq!(
stats.min_values["a"].as_value().unwrap(),
&serde_json::json!(1)
);
assert_eq!(
stats.min_values["nested"].as_column().unwrap()["b"]
.as_value()
.unwrap(),
&serde_json::json!(2)
);
assert_eq!(
stats.min_values["nested"].as_column().unwrap()["c"]
.as_value()
.unwrap(),
&serde_json::json!("a")
);
assert_eq!(
stats.max_values["a"].as_value().unwrap(),
&serde_json::json!(10)
);
assert_eq!(
stats.max_values["nested"].as_column().unwrap()["b"]
.as_value()
.unwrap(),
&serde_json::json!(20)
);
assert_eq!(
stats.max_values["nested"].as_column().unwrap()["c"]
.as_value()
.unwrap(),
&serde_json::json!("z")
);
assert_eq!(stats.null_count["a"].as_value().unwrap(), 1);
assert_eq!(
stats.null_count["nested"].as_column().unwrap()["b"]
.as_value()
.unwrap(),
0
);
assert_eq!(
stats.null_count["nested"].as_column().unwrap()["c"]
.as_value()
.unwrap(),
1
);
}
#[test]
fn test_load_table_partial_stats() {
let action = Add {
stats: Some(
serde_json::json!({
"numRecords": 22
})
.to_string(),
),
path: Default::default(),
data_change: true,
deletion_vector: None,
partition_values: Default::default(),
stats_parsed: None,
tags: None,
size: 0,
modification_time: 0,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};
let stats = action.get_stats().unwrap().unwrap();
assert_eq!(stats.num_records, 22);
assert_eq!(stats.min_values.len(), 0);
assert_eq!(stats.max_values.len(), 0);
assert_eq!(stats.null_count.len(), 0);
}
#[test]
fn test_read_commit_info() {
let raw = r#"
{
"timestamp": 1670892998177,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[\"c1\",\"c2\"]"
},
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "3",
"numOutputRows": "3",
"numOutputBytes": "1356"
},
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
"txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c"
}"#;
let info = serde_json::from_str::<CommitInfo>(raw);
assert!(info.is_ok());
let raw = "{}";
let info = serde_json::from_str::<CommitInfo>(raw);
assert!(info.is_ok());
let raw = r#"
{
"timestamp": 1670892998177,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[\"c1\",\"c2\"]"
},
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "3",
"numOutputRows": "3",
"numOutputBytes": "1356"
},
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
"txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c",
"additionalField": "more data",
"additionalStruct": {
"key": "value",
"otherKey": 123
}
}"#;
let info = serde_json::from_str::<CommitInfo>(raw).expect("should parse");
assert!(info.info.contains_key("additionalField"));
assert!(info.info.contains_key("additionalStruct"));
}
#[test]
fn test_read_domain_metadata() {
let buf = r#"{"domainMetadata":{"domain":"delta.liquid","configuration":"{\"clusteringColumns\":[{\"physicalName\":[\"id\"]}],\"domainName\":\"delta.liquid\"}","removed":false}}"#;
let _action: Action =
serde_json::from_str(buf).expect("Expected to be able to deserialize");
}
mod arrow_tests {
use arrow::array::{self, ArrayRef, StructArray};
use arrow::compute::kernels::cast_utils::Parser;
use arrow::compute::sort_to_indices;
use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimestampMicrosecondType};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
fn sort_batch_by(batch: &RecordBatch, column: &str) -> arrow::error::Result<RecordBatch> {
let sort_column = batch.column(batch.schema().column_with_name(column).unwrap().0);
let sort_indices = sort_to_indices(sort_column, None, None)?;
let schema = batch.schema();
let sorted_columns: Vec<(&String, ArrayRef)> = schema
.fields()
.iter()
.zip(batch.columns().iter())
.map(|(field, column)| {
Ok((
field.name(),
arrow::compute::take(column, &sort_indices, None)?,
))
})
.collect::<arrow::error::Result<_>>()?;
RecordBatch::try_from_iter(sorted_columns)
}
#[tokio::test]
async fn test_with_partitions() {
let path = "../test/tests/data/delta-0.8.0-null-partition";
let table = crate::open_table(path).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let mut expected_columns: Vec<(&str, ArrayRef)> = vec![
("path", Arc::new(array::StringArray::from(vec![
"k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet",
"k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"
]))),
("size_bytes", Arc::new(array::Int64Array::from(vec![460, 460]))),
("modification_time", Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1627990384000, 1627990384000
]))),
("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))),
("partition.k", Arc::new(array::StringArray::from(vec![Some("A"), None]))),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
assert_eq!(expected, actions);
let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
expected_columns[4] = (
"partition_values",
Arc::new(array::StructArray::new(
Fields::from(vec![Field::new("k", DataType::Utf8, true)]),
vec![Arc::new(array::StringArray::from(vec![Some("A"), None])) as ArrayRef],
None,
)),
);
let expected = RecordBatch::try_from_iter(expected_columns).unwrap();
assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_with_deletion_vector() {
let path = "../test/tests/data/table_with_deletion_logs";
let table = crate::open_table(path).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let actions = actions
.project(&[
actions.schema().index_of("path").unwrap(),
actions.schema().index_of("size_bytes").unwrap(),
actions
.schema()
.index_of("deletionVector.storageType")
.unwrap(),
actions
.schema()
.index_of("deletionVector.pathOrInlineDiv")
.unwrap(),
actions.schema().index_of("deletionVector.offset").unwrap(),
actions
.schema()
.index_of("deletionVector.sizeInBytes")
.unwrap(),
actions
.schema()
.index_of("deletionVector.cardinality")
.unwrap(),
])
.unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
])),
),
("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
(
"deletionVector.storageType",
Arc::new(array::StringArray::from(vec!["u"])),
),
(
"deletionVector.pathOrInlineDiv",
Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"])),
),
(
"deletionVector.offset",
Arc::new(array::Int32Array::from(vec![1])),
),
(
"deletionVector.sizeInBytes",
Arc::new(array::Int32Array::from(vec![36])),
),
(
"deletionVector.cardinality",
Arc::new(array::Int64Array::from(vec![2])),
),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
assert_eq!(expected, actions);
let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let actions = actions
.project(&[
actions.schema().index_of("path").unwrap(),
actions.schema().index_of("size_bytes").unwrap(),
actions.schema().index_of("deletionVector").unwrap(),
])
.unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
])),
),
("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
(
"deletionVector",
Arc::new(array::StructArray::new(
Fields::from(vec![
Field::new("storageType", DataType::Utf8, false),
Field::new("pathOrInlineDiv", DataType::Utf8, false),
Field::new("offset", DataType::Int32, true),
Field::new("sizeInBytes", DataType::Int32, false),
Field::new("cardinality", DataType::Int64, false),
]),
vec![
Arc::new(array::StringArray::from(vec!["u"])) as ArrayRef,
Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"]))
as ArrayRef,
Arc::new(array::Int32Array::from(vec![1])) as ArrayRef,
Arc::new(array::Int32Array::from(vec![36])) as ArrayRef,
Arc::new(array::Int64Array::from(vec![2])) as ArrayRef,
],
None,
)),
),
];
let expected = RecordBatch::try_from_iter(expected_columns).unwrap();
assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_without_partitions() {
let path = "../test/tests/data/simple_table";
let table = crate::open_table(path).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet",
"part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet",
"part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet",
"part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet",
"part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet",
])),
),
(
"size_bytes",
Arc::new(array::Int64Array::from(vec![262, 262, 429, 429, 429])),
),
(
"modification_time",
Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1587968626000,
1587968602000,
1587968602000,
1587968602000,
1587968602000,
])),
),
(
"data_change",
Arc::new(array::BooleanArray::from(vec![
true, true, true, true, true,
])),
),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
assert_eq!(expected, actions);
let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
assert_eq!(expected, actions);
}
#[tokio::test]
#[ignore = "column mapping not yet supported."]
async fn test_with_column_mapping() {
let path = "../test/tests/data/table_with_column_mapping";
let table = crate::open_table(path).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet",
"8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet",
])),
),
(
"size_bytes",
Arc::new(array::Int64Array::from(vec![890, 810])),
),
(
"modification_time",
Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1699946088000,
1699946088000,
])),
),
(
"data_change",
Arc::new(array::BooleanArray::from(vec![true, true])),
),
(
"partition.Company Very Short",
Arc::new(array::StringArray::from(vec!["BMS", "BME"])),
),
("num_records", Arc::new(array::Int64Array::from(vec![4, 1]))),
(
"null_count.Company Very Short",
Arc::new(array::NullArray::new(2)),
),
("min.Company Very Short", Arc::new(array::NullArray::new(2))),
("max.Company Very Short", Arc::new(array::NullArray::new(2))),
("null_count.Super Name", Arc::new(array::NullArray::new(2))),
("min.Super Name", Arc::new(array::NullArray::new(2))),
("max.Super Name", Arc::new(array::NullArray::new(2))),
(
"tags.INSERTION_TIME",
Arc::new(array::StringArray::from(vec![
"1699946088000000",
"1699946088000001",
])),
),
(
"tags.MAX_INSERTION_TIME",
Arc::new(array::StringArray::from(vec![
"1699946088000000",
"1699946088000001",
])),
),
(
"tags.MIN_INSERTION_TIME",
Arc::new(array::StringArray::from(vec![
"1699946088000000",
"1699946088000001",
])),
),
(
"tags.OPTIMIZE_TARGET_SIZE",
Arc::new(array::StringArray::from(vec!["33554432", "33554432"])),
),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_with_stats() {
let path = "../test/tests/data/delta-0.8.0";
let table = crate::open_table(path).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet",
"part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
])),
),
(
"size_bytes",
Arc::new(array::Int64Array::from(vec![440, 440])),
),
(
"modification_time",
Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1615043776000,
1615043767000,
])),
),
(
"data_change",
Arc::new(array::BooleanArray::from(vec![true, true])),
),
("num_records", Arc::new(array::Int64Array::from(vec![2, 2]))),
(
"null_count.value",
Arc::new(array::Int64Array::from(vec![0, 0])),
),
("min.value", Arc::new(array::Int32Array::from(vec![2, 0]))),
("max.value", Arc::new(array::Int32Array::from(vec![4, 2]))),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_table_not_always_with_stats() {
let path = "../test/tests/data/delta-stats-optional";
let mut table = crate::open_table(path).await.unwrap();
table.load().await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let expected_path: ArrayRef = Arc::new(array::StringArray::from(vec![
"part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet",
"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
]));
let expected_num_records: ArrayRef =
Arc::new(array::Int64Array::from(vec![None, Some(1)]));
let expected_null_count: ArrayRef =
Arc::new(array::Int64Array::from(vec![None, Some(0)]));
let path_column = actions.column(0);
let num_records_column = actions.column(4);
let null_count_column = actions.column(5);
assert_eq!(&expected_path, path_column);
assert_eq!(&expected_num_records, num_records_column);
assert_eq!(&expected_null_count, null_count_column);
}
#[tokio::test]
async fn test_table_checkpoint_not_always_with_stats() {
let path = "../test/tests/data/delta-checkpoint-stats-optional";
let mut table = crate::open_table(path).await.unwrap();
table.load().await.unwrap();
assert_eq!(2, table.snapshot().unwrap().file_actions().unwrap().len());
}
#[tokio::test]
async fn test_only_struct_stats() {
let path = "../test/tests/data/delta-1.2.1-only-struct-stats";
let mut table = crate::open_table(path).await.unwrap();
table.load_version(1).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
])),
),
("size_bytes", Arc::new(array::Int64Array::from(vec![5489]))),
(
"modification_time",
Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1666652373000,
])),
),
(
"data_change",
Arc::new(array::BooleanArray::from(vec![true])),
),
("num_records", Arc::new(array::Int64Array::from(vec![1]))),
(
"null_count.integer",
Arc::new(array::Int64Array::from(vec![0])),
),
("min.integer", Arc::new(array::Int32Array::from(vec![0]))),
("max.integer", Arc::new(array::Int32Array::from(vec![0]))),
(
"null_count.null",
Arc::new(array::Int64Array::from(vec![1])),
),
("min.null", Arc::new(array::NullArray::new(1))),
("max.null", Arc::new(array::NullArray::new(1))),
(
"null_count.boolean",
Arc::new(array::Int64Array::from(vec![0])),
),
("min.boolean", Arc::new(array::NullArray::new(1))),
("max.boolean", Arc::new(array::NullArray::new(1))),
(
"null_count.double",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"min.double",
Arc::new(array::Float64Array::from(vec![1.234])),
),
(
"max.double",
Arc::new(array::Float64Array::from(vec![1.234])),
),
(
"null_count.decimal",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"min.decimal",
Arc::new(
array::Decimal128Array::from_iter_values([-567800])
.with_precision_and_scale(8, 5)
.unwrap(),
),
),
(
"max.decimal",
Arc::new(
array::Decimal128Array::from_iter_values([-567800])
.with_precision_and_scale(8, 5)
.unwrap(),
),
),
(
"null_count.string",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"min.string",
Arc::new(array::StringArray::from(vec!["string"])),
),
(
"max.string",
Arc::new(array::StringArray::from(vec!["string"])),
),
(
"null_count.binary",
Arc::new(array::Int64Array::from(vec![0])),
),
("min.binary", Arc::new(array::NullArray::new(1))),
("max.binary", Arc::new(array::NullArray::new(1))),
(
"null_count.date",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"min.date",
Arc::new(array::Date32Array::from(vec![Date32Type::parse(
"2022-10-24",
)])),
),
(
"max.date",
Arc::new(array::Date32Array::from(vec![Date32Type::parse(
"2022-10-24",
)])),
),
(
"null_count.timestamp",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"min.timestamp",
Arc::new(
array::TimestampMicrosecondArray::from(vec![
TimestampMicrosecondType::parse("2022-10-24T22:59:32.846Z"),
])
.with_timezone("UTC"),
),
),
(
"max.timestamp",
Arc::new(
array::TimestampMicrosecondArray::from(vec![
TimestampMicrosecondType::parse("2022-10-24T22:59:32.846Z"),
])
.with_timezone("UTC"),
),
),
(
"null_count.struct.struct_element",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"min.struct.struct_element",
Arc::new(array::StringArray::from(vec!["struct_value"])),
),
(
"max.struct.struct_element",
Arc::new(array::StringArray::from(vec!["struct_value"])),
),
("null_count.map", Arc::new(array::Int64Array::from(vec![0]))),
(
"null_count.array",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"null_count.nested_struct.struct_element.nested_struct_element",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"min.nested_struct.struct_element.nested_struct_element",
Arc::new(array::StringArray::from(vec!["nested_struct_value"])),
),
(
"max.nested_struct.struct_element.nested_struct_element",
Arc::new(array::StringArray::from(vec!["nested_struct_value"])),
),
(
"null_count.struct_of_array_of_map.struct_element",
Arc::new(array::Int64Array::from(vec![0])),
),
(
"tags.INSERTION_TIME",
Arc::new(array::StringArray::from(vec!["1666652373000000"])),
),
(
"tags.OPTIMIZE_TARGET_SIZE",
Arc::new(array::StringArray::from(vec!["268435456"])),
),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
assert_eq!(
expected
.schema()
.fields()
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<&str>>(),
actions
.schema()
.fields()
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<&str>>()
);
assert_eq!(expected, actions);
let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
assert_eq!(
actions
.get_field_at_path(&[
"null_count",
"nested_struct",
"struct_element",
"nested_struct_element"
])
.unwrap()
.as_any()
.downcast_ref::<array::Int64Array>()
.unwrap(),
&array::Int64Array::from(vec![0]),
);
assert_eq!(
actions
.get_field_at_path(&[
"min",
"nested_struct",
"struct_element",
"nested_struct_element"
])
.unwrap()
.as_any()
.downcast_ref::<array::StringArray>()
.unwrap(),
&array::StringArray::from(vec!["nested_struct_value"]),
);
assert_eq!(
actions
.get_field_at_path(&[
"max",
"nested_struct",
"struct_element",
"nested_struct_element"
])
.unwrap()
.as_any()
.downcast_ref::<array::StringArray>()
.unwrap(),
&array::StringArray::from(vec!["nested_struct_value"]),
);
assert_eq!(
actions
.get_field_at_path(&["null_count", "struct_of_array_of_map", "struct_element"])
.unwrap()
.as_any()
.downcast_ref::<array::Int64Array>()
.unwrap(),
&array::Int64Array::from(vec![0])
);
assert_eq!(
actions
.get_field_at_path(&["tags", "OPTIMIZE_TARGET_SIZE"])
.unwrap()
.as_any()
.downcast_ref::<array::StringArray>()
.unwrap(),
&array::StringArray::from(vec!["268435456"])
);
}
trait NestedTabular {
fn get_field_at_path(&self, path: &[&str]) -> Option<ArrayRef>;
}
impl NestedTabular for RecordBatch {
fn get_field_at_path(&self, path: &[&str]) -> Option<ArrayRef> {
let (first_key, remainder) = path.split_at(1);
let mut col = self.column(self.schema().column_with_name(first_key[0])?.0);
if remainder.is_empty() {
return Some(Arc::clone(col));
}
for segment in remainder {
col = col
.as_any()
.downcast_ref::<StructArray>()?
.column_by_name(segment)?;
}
Some(Arc::clone(col))
}
}
}
}