deltalake_core/
errors.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
//! Exceptions for the deltalake crate
use object_store::Error as ObjectStoreError;

use crate::operations::transaction::{CommitBuilderError, TransactionError};
use crate::protocol::ProtocolError;

/// A result returned by delta-rs
pub type DeltaResult<T> = Result<T, DeltaTableError>;

/// Delta Table specific error
#[allow(missing_docs)]
#[derive(thiserror::Error, Debug)]
pub enum DeltaTableError {
    #[error("Kernel error: {0}")]
    KernelError(#[from] delta_kernel::error::Error),

    #[error("Delta protocol violation: {source}")]
    Protocol { source: ProtocolError },

    /// Error returned when reading the delta log object failed.
    #[error("Failed to read delta log object: {}", .source)]
    ObjectStore {
        /// Storage error details when reading the delta log object failed.
        #[from]
        source: ObjectStoreError,
    },

    /// Error returned when parsing checkpoint parquet.
    #[error("Failed to parse parquet: {}", .source)]
    Parquet {
        /// Parquet error details returned when reading the checkpoint failed.
        #[from]
        source: parquet::errors::ParquetError,
    },

    /// Error returned when converting the schema in Arrow format failed.
    #[error("Failed to convert into Arrow schema: {}", .source)]
    Arrow {
        /// Arrow error details returned when converting the schema in Arrow format failed
        #[from]
        source: arrow::error::ArrowError,
    },

    /// Error returned when the log record has an invalid JSON.
    #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
    InvalidJsonLog {
        /// JSON error details returned when parsing the record JSON.
        json_err: serde_json::error::Error,
        /// invalid log entry content.
        line: String,
        /// corresponding table version for the log file.
        version: i64,
    },

    /// Error returned when the log contains invalid stats JSON.
    #[error("Invalid JSON in file stats: {}", .json_err)]
    InvalidStatsJson {
        /// JSON error details returned when parsing the stats JSON.
        json_err: serde_json::error::Error,
    },

    /// Error returned when the log contains invalid stats JSON.
    #[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")]
    InvalidInvariantJson {
        /// JSON error details returned when parsing the invariant expression JSON.
        json_err: serde_json::error::Error,
        /// Invariant expression.
        line: String,
    },

    /// Error returned when the DeltaTable has an invalid version.
    #[error("Invalid table version: {0}")]
    InvalidVersion(i64),

    /// Error returned when the DeltaTable has no data files.
    #[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
    MissingDataFile {
        /// Source error details returned when the DeltaTable has no data files.
        source: std::io::Error,
        /// The Path used of the DeltaTable
        path: String,
    },

    /// Error returned when the datetime string is invalid for a conversion.
    #[error("Invalid datetime string: {}", .source)]
    InvalidDateTimeString {
        /// Parse error details returned of the datetime string parse error.
        #[from]
        source: chrono::ParseError,
    },

    /// Error returned when attempting to write bad data to the table
    #[error("Attempted to write invalid data to the table: {:#?}", violations)]
    InvalidData {
        /// Action error details returned of the invalid action.
        violations: Vec<String>,
    },

    /// Error returned when it is not a DeltaTable.
    #[error("Not a Delta table: {0}")]
    NotATable(String),

    /// Error returned when no metadata was found in the DeltaTable.
    #[error("No metadata found, please make sure table is loaded.")]
    NoMetadata,

    /// Error returned when no schema was found in the DeltaTable.
    #[error("No schema found, please make sure table is loaded.")]
    NoSchema,

    /// Error returned when no partition was found in the DeltaTable.
    #[error("No partitions found, please make sure table is partitioned.")]
    LoadPartitions,

    /// Error returned when writes are attempted with data that doesn't match the schema of the
    /// table
    #[error("Data does not match the schema or partitions of the table: {}", msg)]
    SchemaMismatch {
        /// Information about the mismatch
        msg: String,
    },

    /// Error returned when a partition is not formatted as a Hive Partition.
    #[error("This partition is not formatted with key=value: {}", .partition)]
    PartitionError {
        /// The malformed partition used.
        partition: String,
    },

    /// Error returned when a invalid partition filter was found.
    #[error("Invalid partition filter found: {}.", .partition_filter)]
    InvalidPartitionFilter {
        /// The invalid partition filter used.
        partition_filter: String,
    },

    /// Error returned when a partition filter uses a nonpartitioned column.
    #[error("Tried to filter partitions on non-partitioned columns: {:#?}", .nonpartitioned_columns)]
    ColumnsNotPartitioned {
        /// The columns used in the partition filter that is not partitioned
        nonpartitioned_columns: Vec<String>,
    },

    /// Error returned when a line from log record is invalid.
    #[error("Failed to read line from log record")]
    Io {
        /// Source error details returned while reading the log record.
        #[from]
        source: std::io::Error,
    },

    /// Error raised while preparing a commit
    #[error("Commit actions are unsound: {source}")]
    CommitValidation {
        /// The source error
        source: CommitBuilderError,
    },

    /// Error raised while commititng transaction
    #[error("Transaction failed: {source}")]
    Transaction {
        /// The source error
        source: TransactionError,
    },

    /// Error returned when transaction is failed to be committed because given version already exists.
    #[error("Delta transaction failed, version {0} already exists.")]
    VersionAlreadyExists(i64),

    /// Error returned when user attempts to commit actions that don't belong to the next version.
    #[error("Delta transaction failed, version {0} does not follow {1}")]
    VersionMismatch(i64, i64),

    /// A Feature is missing to perform operation
    #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
    MissingFeature {
        /// Name of the missing feature
        feature: &'static str,
        /// Storage location url
        url: String,
    },

    /// A Feature is missing to perform operation
    #[error("Cannot infer storage location from: {0}")]
    InvalidTableLocation(String),

    /// Generic Delta Table error
    #[error("Log JSON serialization error: {json_err}")]
    SerializeLogJson {
        /// JSON serialization error
        json_err: serde_json::error::Error,
    },

    /// Generic Delta Table error
    #[error("Schema JSON serialization error: {json_err}")]
    SerializeSchemaJson {
        /// JSON serialization error
        json_err: serde_json::error::Error,
    },

    /// Generic Delta Table error
    #[error("Generic DeltaTable error: {0}")]
    Generic(String),

    /// Generic Delta Table error
    #[error("Generic error: {source}")]
    GenericError {
        /// Source error
        source: Box<dyn std::error::Error + Send + Sync + 'static>,
    },

    #[error("Kernel: {source}")]
    Kernel {
        #[from]
        source: crate::kernel::Error,
    },

    #[error("Table metadata is invalid: {0}")]
    MetadataError(String),

    #[error("Table has not yet been initialized")]
    NotInitialized,

    #[error("Table has not yet been initialized with files, therefore {0} is not supported")]
    NotInitializedWithFiles(String),

    #[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
    ChangeDataNotRecorded { version: i64, start: i64, end: i64 },

    #[error("Reading a table version: {version} that does not have change data enabled")]
    ChangeDataNotEnabled { version: i64 },

    #[error("Invalid version start version {start} is greater than version {end}")]
    ChangeDataInvalidVersionRange { start: i64, end: i64 },
}

impl From<object_store::path::Error> for DeltaTableError {
    fn from(err: object_store::path::Error) -> Self {
        Self::GenericError {
            source: Box::new(err),
        }
    }
}

impl From<ProtocolError> for DeltaTableError {
    fn from(value: ProtocolError) -> Self {
        match value {
            ProtocolError::Arrow { source } => DeltaTableError::Arrow { source },
            ProtocolError::IO { source } => DeltaTableError::Io { source },
            ProtocolError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
            ProtocolError::ParquetParseError { source } => DeltaTableError::Parquet { source },
            _ => DeltaTableError::Protocol { source: value },
        }
    }
}

impl From<serde_json::Error> for DeltaTableError {
    fn from(value: serde_json::Error) -> Self {
        DeltaTableError::InvalidStatsJson { json_err: value }
    }
}

impl DeltaTableError {
    /// Crate a NotATable Error with message for given path.
    pub fn not_a_table(path: impl AsRef<str>) -> Self {
        let msg = format!(
            "No snapshot or version 0 found, perhaps {} is an empty dir?",
            path.as_ref()
        );
        Self::NotATable(msg)
    }

    /// Create a [Generic](DeltaTableError::Generic) error with the given message.
    pub fn generic(msg: impl ToString) -> Self {
        Self::Generic(msg.to_string())
    }
}