deltalake_core/kernel/models/
schema.rsuse std::sync::Arc;
pub use delta_kernel::schema::{
ArrayType, ColumnMetadataKey, DataType, MapType, MetadataValue, PrimitiveType, StructField,
StructType,
};
use serde_json::Value;
use crate::kernel::error::Error;
use crate::kernel::DataCheck;
pub type Schema = StructType;
pub type SchemaRef = Arc<StructType>;
#[derive(Eq, PartialEq, Debug, Default, Clone)]
pub struct Invariant {
pub field_name: String,
pub invariant_sql: String,
}
impl Invariant {
pub fn new(field_name: &str, invariant_sql: &str) -> Self {
Self {
field_name: field_name.to_string(),
invariant_sql: invariant_sql.to_string(),
}
}
}
impl DataCheck for Invariant {
fn get_name(&self) -> &str {
&self.field_name
}
fn get_expression(&self) -> &str {
&self.invariant_sql
}
}
pub trait StructTypeExt {
fn get_invariants(&self) -> Result<Vec<Invariant>, Error>;
}
impl StructTypeExt for StructType {
fn get_invariants(&self) -> Result<Vec<Invariant>, Error> {
let mut remaining_fields: Vec<(String, StructField)> = self
.fields()
.map(|field| (field.name.clone(), field.clone()))
.collect();
let mut invariants: Vec<Invariant> = Vec::new();
let add_segment = |prefix: &str, segment: &str| -> String {
if prefix.is_empty() {
segment.to_owned()
} else {
format!("{prefix}.{segment}")
}
};
while let Some((field_path, field)) = remaining_fields.pop() {
match field.data_type() {
DataType::Struct(inner) => {
remaining_fields.extend(
inner
.fields()
.map(|field| {
let new_prefix = add_segment(&field_path, &field.name);
(new_prefix, field.clone())
})
.collect::<Vec<(String, StructField)>>(),
);
}
DataType::Array(inner) => {
let element_field_name = add_segment(&field_path, "element");
remaining_fields.push((
element_field_name,
StructField::new("".to_string(), inner.element_type.clone(), false),
));
}
DataType::Map(inner) => {
let key_field_name = add_segment(&field_path, "key");
remaining_fields.push((
key_field_name,
StructField::new("".to_string(), inner.key_type.clone(), false),
));
let value_field_name = add_segment(&field_path, "value");
remaining_fields.push((
value_field_name,
StructField::new("".to_string(), inner.value_type.clone(), false),
));
}
_ => {}
}
if let Some(MetadataValue::String(invariant_json)) =
field.metadata.get(ColumnMetadataKey::Invariants.as_ref())
{
let json: Value = serde_json::from_str(invariant_json).map_err(|e| {
Error::InvalidInvariantJson {
json_err: e,
line: invariant_json.to_string(),
}
})?;
if let Value::Object(json) = json {
if let Some(Value::Object(expr1)) = json.get("expression") {
if let Some(Value::String(sql)) = expr1.get("expression") {
invariants.push(Invariant::new(&field_path, sql));
}
}
}
}
}
Ok(invariants)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json;
use serde_json::json;
#[test]
fn test_get_invariants() {
let schema: StructType = serde_json::from_value(json!({
"type": "struct",
"fields": [{"name": "x", "type": "string", "nullable": true, "metadata": {}}]
}))
.unwrap();
let invariants = schema.get_invariants().unwrap();
assert_eq!(invariants.len(), 0);
let schema: StructType = serde_json::from_value(json!({
"type": "struct",
"fields": [
{"name": "x", "type": "integer", "nullable": true, "metadata": {
"delta.invariants": "{\"expression\": { \"expression\": \"x > 2\"} }"
}},
{"name": "y", "type": "integer", "nullable": true, "metadata": {
"delta.invariants": "{\"expression\": { \"expression\": \"y < 4\"} }"
}}
]
}))
.unwrap();
let invariants = schema.get_invariants().unwrap();
assert_eq!(invariants.len(), 2);
assert!(invariants.contains(&Invariant::new("x", "x > 2")));
assert!(invariants.contains(&Invariant::new("y", "y < 4")));
let schema: StructType = serde_json::from_value(json!({
"type": "struct",
"fields": [{
"name": "a_map",
"type": {
"type": "map",
"keyType": "string",
"valueType": {
"type": "array",
"elementType": {
"type": "struct",
"fields": [{
"name": "d",
"type": "integer",
"metadata": {
"delta.invariants": "{\"expression\": { \"expression\": \"a_map.value.element.d < 4\"} }"
},
"nullable": false
}]
},
"containsNull": false
},
"valueContainsNull": false
},
"nullable": false,
"metadata": {}
}]
})).unwrap();
let invariants = schema.get_invariants().unwrap();
assert_eq!(invariants.len(), 1);
assert_eq!(
invariants[0],
Invariant::new("a_map.value.element.d", "a_map.value.element.d < 4")
);
}
#[test]
fn test_identity_columns() {
let buf = r#"{"type":"struct","fields":[{"name":"ID_D_DATE","type":"long","nullable":true,"metadata":{"delta.identity.start":1,"delta.identity.step":1,"delta.identity.allowExplicitInsert":false}},{"name":"TXT_DateKey","type":"string","nullable":true,"metadata":{}}]}"#;
let _schema: StructType = serde_json::from_str(buf).expect("Failed to load");
}
}