deltalake_core/delta_datafusion/cdf/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//! Logical operators and physical executions for CDF
use std::collections::HashMap;

use arrow_schema::{DataType, Field, TimeUnit};
use lazy_static::lazy_static;

pub(crate) use self::scan::*;
pub(crate) use self::scan_utils::*;
use crate::kernel::{Add, AddCDCFile, Remove};
use crate::DeltaResult;

mod scan;
mod scan_utils;

/// Change type column name
pub const CHANGE_TYPE_COL: &str = "_change_type";
/// Commit version column name
pub const COMMIT_VERSION_COL: &str = "_commit_version";
/// Commit Timestamp column name
pub const COMMIT_TIMESTAMP_COL: &str = "_commit_timestamp";

lazy_static! {
    pub(crate) static ref CDC_PARTITION_SCHEMA: Vec<Field> = vec![
        Field::new(COMMIT_VERSION_COL, DataType::Int64, true),
        Field::new(
            COMMIT_TIMESTAMP_COL,
            DataType::Timestamp(TimeUnit::Millisecond, None),
            true
        )
    ];
    pub(crate) static ref ADD_PARTITION_SCHEMA: Vec<Field> = vec![
        Field::new(CHANGE_TYPE_COL, DataType::Utf8, true),
        Field::new(COMMIT_VERSION_COL, DataType::Int64, true),
        Field::new(
            COMMIT_TIMESTAMP_COL,
            DataType::Timestamp(TimeUnit::Millisecond, None),
            true
        ),
    ];
}

#[derive(Debug)]
pub(crate) struct CdcDataSpec<F: FileAction> {
    version: i64,
    timestamp: i64,
    actions: Vec<F>,
}

impl<F: FileAction> CdcDataSpec<F> {
    pub fn new(version: i64, timestamp: i64, actions: Vec<F>) -> Self {
        Self {
            version,
            timestamp,
            actions,
        }
    }
}

/// This trait defines a generic set of operations used by CDF Reader
pub trait FileAction {
    /// Adds partition values
    fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>>;
    /// Physical Path to the data
    fn path(&self) -> String;
    /// Byte size of the physical file
    fn size(&self) -> DeltaResult<usize>;
}

impl FileAction for Add {
    fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
        Ok(&self.partition_values)
    }

    fn path(&self) -> String {
        self.path.clone()
    }

    fn size(&self) -> DeltaResult<usize> {
        Ok(self.size as usize)
    }
}

impl FileAction for AddCDCFile {
    fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
        Ok(&self.partition_values)
    }

    fn path(&self) -> String {
        self.path.clone()
    }

    fn size(&self) -> DeltaResult<usize> {
        Ok(self.size as usize)
    }
}

impl FileAction for Remove {
    fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
        // If extended_file_metadata is true, it should be required to have this filled in
        if self.extended_file_metadata.unwrap_or_default() {
            Ok(self.partition_values.as_ref().unwrap())
        } else {
            match self.partition_values {
                Some(ref part_map) => Ok(part_map),
                _ => Err(crate::DeltaTableError::Protocol {
                    source: crate::protocol::ProtocolError::InvalidField(
                        "partition_values".to_string(),
                    ),
                }),
            }
        }
    }

    fn path(&self) -> String {
        self.path.clone()
    }

    fn size(&self) -> DeltaResult<usize> {
        // If extended_file_metadata is true, it should be required to have this filled in
        if self.extended_file_metadata.unwrap_or_default() {
            Ok(self.size.unwrap() as usize)
        } else {
            match self.size {
                Some(size) => Ok(size as usize),
                _ => Err(crate::DeltaTableError::Protocol {
                    source: crate::protocol::ProtocolError::InvalidField("size".to_string()),
                }),
            }
        }
    }
}