deltalake_core/errors.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
//! Exceptions for the deltalake crate
use object_store::Error as ObjectStoreError;
use crate::operations::transaction::{CommitBuilderError, TransactionError};
use crate::protocol::ProtocolError;
/// A result returned by delta-rs
pub type DeltaResult<T> = Result<T, DeltaTableError>;
/// Delta Table specific error
#[allow(missing_docs)]
#[derive(thiserror::Error, Debug)]
pub enum DeltaTableError {
#[error("Kernel error: {0}")]
KernelError(#[from] delta_kernel::error::Error),
#[error("Delta protocol violation: {source}")]
Protocol { source: ProtocolError },
/// Error returned when reading the delta log object failed.
#[error("Failed to read delta log object: {}", .source)]
ObjectStore {
/// Storage error details when reading the delta log object failed.
#[from]
source: ObjectStoreError,
},
/// Error returned when parsing checkpoint parquet.
#[error("Failed to parse parquet: {}", .source)]
Parquet {
/// Parquet error details returned when reading the checkpoint failed.
#[from]
source: parquet::errors::ParquetError,
},
/// Error returned when converting the schema in Arrow format failed.
#[error("Failed to convert into Arrow schema: {}", .source)]
Arrow {
/// Arrow error details returned when converting the schema in Arrow format failed
#[from]
source: arrow::error::ArrowError,
},
/// Error returned when the log record has an invalid JSON.
#[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
InvalidJsonLog {
/// JSON error details returned when parsing the record JSON.
json_err: serde_json::error::Error,
/// invalid log entry content.
line: String,
/// corresponding table version for the log file.
version: i64,
},
/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in file stats: {}", .json_err)]
InvalidStatsJson {
/// JSON error details returned when parsing the stats JSON.
json_err: serde_json::error::Error,
},
/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")]
InvalidInvariantJson {
/// JSON error details returned when parsing the invariant expression JSON.
json_err: serde_json::error::Error,
/// Invariant expression.
line: String,
},
/// Error returned when the DeltaTable has an invalid version.
#[error("Invalid table version: {0}")]
InvalidVersion(i64),
/// Error returned when the DeltaTable has no data files.
#[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
MissingDataFile {
/// Source error details returned when the DeltaTable has no data files.
source: std::io::Error,
/// The Path used of the DeltaTable
path: String,
},
/// Error returned when the datetime string is invalid for a conversion.
#[error("Invalid datetime string: {}", .source)]
InvalidDateTimeString {
/// Parse error details returned of the datetime string parse error.
#[from]
source: chrono::ParseError,
},
/// Error returned when attempting to write bad data to the table
#[error("Attempted to write invalid data to the table: {:#?}", violations)]
InvalidData {
/// Action error details returned of the invalid action.
violations: Vec<String>,
},
/// Error returned when it is not a DeltaTable.
#[error("Not a Delta table: {0}")]
NotATable(String),
/// Error returned when no metadata was found in the DeltaTable.
#[error("No metadata found, please make sure table is loaded.")]
NoMetadata,
/// Error returned when no schema was found in the DeltaTable.
#[error("No schema found, please make sure table is loaded.")]
NoSchema,
/// Error returned when no partition was found in the DeltaTable.
#[error("No partitions found, please make sure table is partitioned.")]
LoadPartitions,
/// Error returned when writes are attempted with data that doesn't match the schema of the
/// table
#[error("Data does not match the schema or partitions of the table: {}", msg)]
SchemaMismatch {
/// Information about the mismatch
msg: String,
},
/// Error returned when a partition is not formatted as a Hive Partition.
#[error("This partition is not formatted with key=value: {}", .partition)]
PartitionError {
/// The malformed partition used.
partition: String,
},
/// Error returned when a invalid partition filter was found.
#[error("Invalid partition filter found: {}.", .partition_filter)]
InvalidPartitionFilter {
/// The invalid partition filter used.
partition_filter: String,
},
/// Error returned when a partition filter uses a nonpartitioned column.
#[error("Tried to filter partitions on non-partitioned columns: {:#?}", .nonpartitioned_columns)]
ColumnsNotPartitioned {
/// The columns used in the partition filter that is not partitioned
nonpartitioned_columns: Vec<String>,
},
/// Error returned when a line from log record is invalid.
#[error("Failed to read line from log record")]
Io {
/// Source error details returned while reading the log record.
#[from]
source: std::io::Error,
},
/// Error raised while preparing a commit
#[error("Commit actions are unsound: {source}")]
CommitValidation {
/// The source error
source: CommitBuilderError,
},
/// Error raised while commititng transaction
#[error("Transaction failed: {source}")]
Transaction {
/// The source error
source: TransactionError,
},
/// Error returned when transaction is failed to be committed because given version already exists.
#[error("Delta transaction failed, version {0} already exists.")]
VersionAlreadyExists(i64),
/// Error returned when user attempts to commit actions that don't belong to the next version.
#[error("Delta transaction failed, version {0} does not follow {1}")]
VersionMismatch(i64, i64),
/// A Feature is missing to perform operation
#[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
MissingFeature {
/// Name of the missing feature
feature: &'static str,
/// Storage location url
url: String,
},
/// A Feature is missing to perform operation
#[error("Cannot infer storage location from: {0}")]
InvalidTableLocation(String),
/// Generic Delta Table error
#[error("Log JSON serialization error: {json_err}")]
SerializeLogJson {
/// JSON serialization error
json_err: serde_json::error::Error,
},
/// Generic Delta Table error
#[error("Schema JSON serialization error: {json_err}")]
SerializeSchemaJson {
/// JSON serialization error
json_err: serde_json::error::Error,
},
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),
/// Generic Delta Table error
#[error("Generic error: {source}")]
GenericError {
/// Source error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Kernel: {source}")]
Kernel {
#[from]
source: crate::kernel::Error,
},
#[error("Table metadata is invalid: {0}")]
MetadataError(String),
#[error("Table has not yet been initialized")]
NotInitialized,
#[error("Table has not yet been initialized with files, therefore {0} is not supported")]
NotInitializedWithFiles(String),
#[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
ChangeDataNotRecorded { version: i64, start: i64, end: i64 },
#[error("Reading a table version: {version} that does not have change data enabled")]
ChangeDataNotEnabled { version: i64 },
#[error("Invalid version start version {start} is greater than version {end}")]
ChangeDataInvalidVersionRange { start: i64, end: i64 },
}
impl From<object_store::path::Error> for DeltaTableError {
fn from(err: object_store::path::Error) -> Self {
Self::GenericError {
source: Box::new(err),
}
}
}
impl From<ProtocolError> for DeltaTableError {
fn from(value: ProtocolError) -> Self {
match value {
ProtocolError::Arrow { source } => DeltaTableError::Arrow { source },
ProtocolError::IO { source } => DeltaTableError::Io { source },
ProtocolError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
ProtocolError::ParquetParseError { source } => DeltaTableError::Parquet { source },
_ => DeltaTableError::Protocol { source: value },
}
}
}
impl From<serde_json::Error> for DeltaTableError {
fn from(value: serde_json::Error) -> Self {
DeltaTableError::InvalidStatsJson { json_err: value }
}
}
impl DeltaTableError {
/// Crate a NotATable Error with message for given path.
pub fn not_a_table(path: impl AsRef<str>) -> Self {
let msg = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
path.as_ref()
);
Self::NotATable(msg)
}
/// Create a [Generic](DeltaTableError::Generic) error with the given message.
pub fn generic(msg: impl ToString) -> Self {
Self::Generic(msg.to_string())
}
}