lance_core/
error.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::ArrowError;
5use snafu::{Location, Snafu};
6
7type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
8
9/// Allocates error on the heap and then places `e` into it.
10#[inline]
11pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedError {
12    Box::new(e)
13}
14
15#[derive(Debug, Snafu)]
16#[snafu(visibility(pub))]
17pub enum Error {
18    #[snafu(display("Invalid user input: {source}, {location}"))]
19    InvalidInput {
20        source: BoxedError,
21        location: Location,
22    },
23    #[snafu(display("Dataset already exists: {uri}, {location}"))]
24    DatasetAlreadyExists { uri: String, location: Location },
25    #[snafu(display("Append with different schema: {difference}, location: {location}"))]
26    SchemaMismatch {
27        difference: String,
28        location: Location,
29    },
30    #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))]
31    DatasetNotFound {
32        path: String,
33        source: BoxedError,
34        location: Location,
35    },
36    #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))]
37    CorruptFile {
38        path: object_store::path::Path,
39        source: BoxedError,
40        location: Location,
41        // TODO: add backtrace?
42    },
43    #[snafu(display("Not supported: {source}, {location}"))]
44    NotSupported {
45        source: BoxedError,
46        location: Location,
47    },
48    #[snafu(display("Commit conflict for version {version}: {source}, {location}"))]
49    CommitConflict {
50        version: u64,
51        source: BoxedError,
52        location: Location,
53    },
54    #[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}, {location}"))]
55    Internal { message: String, location: Location },
56    #[snafu(display("A prerequisite task failed: {message}, {location}"))]
57    PrerequisiteFailed { message: String, location: Location },
58    #[snafu(display("LanceError(Arrow): {message}, {location}"))]
59    Arrow { message: String, location: Location },
60    #[snafu(display("LanceError(Schema): {message}, {location}"))]
61    Schema { message: String, location: Location },
62    #[snafu(display("Not found: {uri}, {location}"))]
63    NotFound { uri: String, location: Location },
64    #[snafu(display("LanceError(IO): {source}, {location}"))]
65    IO {
66        source: BoxedError,
67        location: Location,
68    },
69    #[snafu(display("LanceError(Index): {message}, {location}"))]
70    Index { message: String, location: Location },
71    #[snafu(display("Lance index not found: {identity}, {location}"))]
72    IndexNotFound {
73        identity: String,
74        location: Location,
75    },
76    #[snafu(display("Cannot infer storage location from: {message}"))]
77    InvalidTableLocation { message: String },
78    /// Stream early stop
79    Stop,
80    #[snafu(display("Wrapped error: {error}, {location}"))]
81    Wrapped {
82        error: BoxedError,
83        location: Location,
84    },
85    #[snafu(display("Cloned error: {message}, {location}"))]
86    Cloned { message: String, location: Location },
87    #[snafu(display("Query Execution error: {message}, {location}"))]
88    Execution { message: String, location: Location },
89    #[snafu(display("Ref is invalid: {message}"))]
90    InvalidRef { message: String },
91    #[snafu(display("Ref conflict error: {message}"))]
92    RefConflict { message: String },
93    #[snafu(display("Ref not found error: {message}"))]
94    RefNotFound { message: String },
95    #[snafu(display("Cleanup error: {message}"))]
96    Cleanup { message: String },
97    #[snafu(display("Version not found error: {message}"))]
98    VersionNotFound { message: String },
99    #[snafu(display("Version conflict error: {message}"))]
100    VersionConflict {
101        message: String,
102        major_version: u16,
103        minor_version: u16,
104        location: Location,
105    },
106}
107
108impl Error {
109    pub fn corrupt_file(
110        path: object_store::path::Path,
111        message: impl Into<String>,
112        location: Location,
113    ) -> Self {
114        let message: String = message.into();
115        Self::CorruptFile {
116            path,
117            source: message.into(),
118            location,
119        }
120    }
121
122    pub fn invalid_input(message: impl Into<String>, location: Location) -> Self {
123        let message: String = message.into();
124        Self::InvalidInput {
125            source: message.into(),
126            location,
127        }
128    }
129
130    pub fn io(message: impl Into<String>, location: Location) -> Self {
131        let message: String = message.into();
132        Self::IO {
133            source: message.into(),
134            location,
135        }
136    }
137
138    pub fn version_conflict(
139        message: impl Into<String>,
140        major_version: u16,
141        minor_version: u16,
142        location: Location,
143    ) -> Self {
144        let message: String = message.into();
145        Self::VersionConflict {
146            message,
147            major_version,
148            minor_version,
149            location,
150        }
151    }
152}
153
154trait ToSnafuLocation {
155    fn to_snafu_location(&'static self) -> snafu::Location;
156}
157
158impl ToSnafuLocation for std::panic::Location<'static> {
159    fn to_snafu_location(&'static self) -> snafu::Location {
160        snafu::Location::new(self.file(), self.line(), self.column())
161    }
162}
163
164pub type Result<T> = std::result::Result<T, Error>;
165pub type ArrowResult<T> = std::result::Result<T, ArrowError>;
166#[cfg(feature = "datafusion")]
167pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFusionError>;
168
169impl From<ArrowError> for Error {
170    #[track_caller]
171    fn from(e: ArrowError) -> Self {
172        Self::Arrow {
173            message: e.to_string(),
174            location: std::panic::Location::caller().to_snafu_location(),
175        }
176    }
177}
178
179impl From<&ArrowError> for Error {
180    #[track_caller]
181    fn from(e: &ArrowError) -> Self {
182        Self::Arrow {
183            message: e.to_string(),
184            location: std::panic::Location::caller().to_snafu_location(),
185        }
186    }
187}
188
189impl From<std::io::Error> for Error {
190    #[track_caller]
191    fn from(e: std::io::Error) -> Self {
192        Self::IO {
193            source: box_error(e),
194            location: std::panic::Location::caller().to_snafu_location(),
195        }
196    }
197}
198
199impl From<object_store::Error> for Error {
200    #[track_caller]
201    fn from(e: object_store::Error) -> Self {
202        Self::IO {
203            source: box_error(e),
204            location: std::panic::Location::caller().to_snafu_location(),
205        }
206    }
207}
208
209impl From<prost::DecodeError> for Error {
210    #[track_caller]
211    fn from(e: prost::DecodeError) -> Self {
212        Self::IO {
213            source: box_error(e),
214            location: std::panic::Location::caller().to_snafu_location(),
215        }
216    }
217}
218
219impl From<prost::EncodeError> for Error {
220    #[track_caller]
221    fn from(e: prost::EncodeError) -> Self {
222        Self::IO {
223            source: box_error(e),
224            location: std::panic::Location::caller().to_snafu_location(),
225        }
226    }
227}
228
229impl From<prost::UnknownEnumValue> for Error {
230    #[track_caller]
231    fn from(e: prost::UnknownEnumValue) -> Self {
232        Self::IO {
233            source: box_error(e),
234            location: std::panic::Location::caller().to_snafu_location(),
235        }
236    }
237}
238
239impl From<tokio::task::JoinError> for Error {
240    #[track_caller]
241    fn from(e: tokio::task::JoinError) -> Self {
242        Self::IO {
243            source: box_error(e),
244            location: std::panic::Location::caller().to_snafu_location(),
245        }
246    }
247}
248
249impl From<object_store::path::Error> for Error {
250    #[track_caller]
251    fn from(e: object_store::path::Error) -> Self {
252        Self::IO {
253            source: box_error(e),
254            location: std::panic::Location::caller().to_snafu_location(),
255        }
256    }
257}
258
259impl From<url::ParseError> for Error {
260    #[track_caller]
261    fn from(e: url::ParseError) -> Self {
262        Self::IO {
263            source: box_error(e),
264            location: std::panic::Location::caller().to_snafu_location(),
265        }
266    }
267}
268
269impl From<serde_json::Error> for Error {
270    #[track_caller]
271    fn from(e: serde_json::Error) -> Self {
272        Self::Arrow {
273            message: e.to_string(),
274            location: std::panic::Location::caller().to_snafu_location(),
275        }
276    }
277}
278
279#[track_caller]
280fn arrow_io_error_from_msg(message: String) -> ArrowError {
281    ArrowError::IoError(
282        message.clone(),
283        std::io::Error::new(std::io::ErrorKind::Other, message),
284    )
285}
286
287impl From<Error> for ArrowError {
288    fn from(value: Error) -> Self {
289        match value {
290            Error::Arrow { message, .. } => arrow_io_error_from_msg(message), // we lose the error type converting to LanceError
291            Error::IO { source, .. } => arrow_io_error_from_msg(source.to_string()),
292            Error::Schema { message, .. } => Self::SchemaError(message),
293            Error::Index { message, .. } => arrow_io_error_from_msg(message),
294            Error::Stop => arrow_io_error_from_msg("early stop".to_string()),
295            e => arrow_io_error_from_msg(e.to_string()), // Find a more scalable way of doing this
296        }
297    }
298}
299
300#[cfg(feature = "datafusion")]
301impl From<datafusion_sql::sqlparser::parser::ParserError> for Error {
302    #[track_caller]
303    fn from(e: datafusion_sql::sqlparser::parser::ParserError) -> Self {
304        Self::IO {
305            source: box_error(e),
306            location: std::panic::Location::caller().to_snafu_location(),
307        }
308    }
309}
310
311#[cfg(feature = "datafusion")]
312impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
313    #[track_caller]
314    fn from(e: datafusion_sql::sqlparser::tokenizer::TokenizerError) -> Self {
315        Self::IO {
316            source: box_error(e),
317            location: std::panic::Location::caller().to_snafu_location(),
318        }
319    }
320}
321
322#[cfg(feature = "datafusion")]
323impl From<Error> for datafusion_common::DataFusionError {
324    #[track_caller]
325    fn from(e: Error) -> Self {
326        Self::Execution(e.to_string())
327    }
328}
329
330#[cfg(feature = "datafusion")]
331impl From<datafusion_common::DataFusionError> for Error {
332    #[track_caller]
333    fn from(e: datafusion_common::DataFusionError) -> Self {
334        let location = std::panic::Location::caller().to_snafu_location();
335        match e {
336            datafusion_common::DataFusionError::SQL(..)
337            | datafusion_common::DataFusionError::Plan(..)
338            | datafusion_common::DataFusionError::Configuration(..) => Self::InvalidInput {
339                source: box_error(e),
340                location,
341            },
342            datafusion_common::DataFusionError::SchemaError(..) => Self::Schema {
343                message: e.to_string(),
344                location,
345            },
346            datafusion_common::DataFusionError::ArrowError(..) => Self::Arrow {
347                message: e.to_string(),
348                location,
349            },
350            datafusion_common::DataFusionError::NotImplemented(..) => Self::NotSupported {
351                source: box_error(e),
352                location,
353            },
354            datafusion_common::DataFusionError::Execution(..) => Self::Execution {
355                message: e.to_string(),
356                location,
357            },
358            _ => Self::IO {
359                source: box_error(e),
360                location,
361            },
362        }
363    }
364}
365
366// This is a bit odd but some object_store functions only accept
367// Stream<Result<T, ObjectStoreError>> and so we need to convert
368// to ObjectStoreError to call the methods.
369impl From<Error> for object_store::Error {
370    fn from(err: Error) -> Self {
371        Self::Generic {
372            store: "N/A",
373            source: Box::new(err),
374        }
375    }
376}
377
378#[track_caller]
379pub fn get_caller_location() -> &'static std::panic::Location<'static> {
380    std::panic::Location::caller()
381}
382
383/// Wrap an error in a new error type that implements Clone
384///
385/// This is useful when two threads/streams share a common fallible source
386/// The base error will always have the full error.  Any cloned results will
387/// only have Error::Cloned with the to_string of the base error.
388pub struct CloneableError(pub Error);
389
390impl Clone for CloneableError {
391    #[track_caller]
392    fn clone(&self) -> Self {
393        Self(Error::Cloned {
394            message: self.0.to_string(),
395            location: std::panic::Location::caller().to_snafu_location(),
396        })
397    }
398}
399
400#[derive(Clone)]
401pub struct CloneableResult<T: Clone>(pub std::result::Result<T, CloneableError>);
402
403impl<T: Clone> From<Result<T>> for CloneableResult<T> {
404    fn from(result: Result<T>) -> Self {
405        Self(result.map_err(CloneableError))
406    }
407}
408
409#[cfg(test)]
410mod test {
411    use super::*;
412
413    #[test]
414    fn test_caller_location_capture() {
415        let current_fn = get_caller_location();
416        // make sure ? captures the correct location
417        // .into() WILL NOT capture the correct location
418        let f: Box<dyn Fn() -> Result<()>> = Box::new(|| {
419            Err(object_store::Error::Generic {
420                store: "",
421                source: "".into(),
422            })?;
423            Ok(())
424        });
425        match f().unwrap_err() {
426            Error::IO { location, .. } => {
427                // +4 is the beginning of object_store::Error::Generic...
428                assert_eq!(location.line, current_fn.line() + 4, "{}", location)
429            }
430            #[allow(unreachable_patterns)]
431            _ => panic!("expected ObjectStore error"),
432        }
433    }
434}