use std::backtrace::{Backtrace, BacktraceStatus};
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::io;
use std::result;
use std::sync::Arc;
use crate::utils::quote_identifier;
use crate::{Column, DFSchema, OwnedTableReference};
#[cfg(feature = "avro")]
use apache_avro::Error as AvroError;
use arrow::error::ArrowError;
#[cfg(feature = "parquet")]
use parquet::errors::ParquetError;
use sqlparser::parser::ParserError;
pub type Result<T, E = DataFusionError> = result::Result<T, E>;
pub type SharedResult<T> = result::Result<T, Arc<DataFusionError>>;
pub type GenericError = Box<dyn Error + Send + Sync>;
#[derive(Debug)]
pub enum DataFusionError {
ArrowError(ArrowError),
#[cfg(feature = "parquet")]
ParquetError(ParquetError),
#[cfg(feature = "avro")]
AvroError(AvroError),
#[cfg(feature = "object_store")]
ObjectStore(object_store::Error),
IoError(io::Error),
SQL(ParserError),
NotImplemented(String),
Internal(String),
Plan(String),
Configuration(String),
SchemaError(SchemaError),
Execution(String),
ResourcesExhausted(String),
External(GenericError),
Context(String, Box<DataFusionError>),
Substrait(String),
}
#[macro_export]
macro_rules! context {
($desc:expr, $err:expr) => {
$err.context(format!("{} at {}:{}", $desc, file!(), line!()))
};
}
#[derive(Debug)]
pub enum SchemaError {
AmbiguousReference { field: Column },
DuplicateQualifiedField {
qualifier: Box<OwnedTableReference>,
name: String,
},
DuplicateUnqualifiedField { name: String },
FieldNotFound {
field: Box<Column>,
valid_fields: Vec<Column>,
},
}
pub fn field_not_found<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}
impl Display for SchemaError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::FieldNotFound {
field,
valid_fields,
} => {
write!(f, "No field named {}", field.quoted_flat_name())?;
if !valid_fields.is_empty() {
write!(
f,
". Valid fields are {}",
valid_fields
.iter()
.map(|field| field.quoted_flat_name())
.collect::<Vec<String>>()
.join(", ")
)?;
}
write!(f, ".")
}
Self::DuplicateQualifiedField { qualifier, name } => {
write!(
f,
"Schema contains duplicate qualified field name {}.{}",
qualifier.to_quoted_string(),
quote_identifier(name)
)
}
Self::DuplicateUnqualifiedField { name } => {
write!(
f,
"Schema contains duplicate unqualified field name {}",
quote_identifier(name)
)
}
Self::AmbiguousReference { field } => {
if field.relation.is_some() {
write!(
f,
"Schema contains qualified field name {} and unqualified field name {} which would be ambiguous",
field.quoted_flat_name(),
quote_identifier(&field.name)
)
} else {
write!(
f,
"Ambiguous reference to unqualified field {}",
field.quoted_flat_name()
)
}
}
}
}
}
impl Error for SchemaError {}
impl From<std::fmt::Error> for DataFusionError {
fn from(_e: std::fmt::Error) -> Self {
DataFusionError::Execution("Fail to format".to_string())
}
}
impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
}
}
impl From<ArrowError> for DataFusionError {
fn from(e: ArrowError) -> Self {
DataFusionError::ArrowError(e)
}
}
impl From<DataFusionError> for ArrowError {
fn from(e: DataFusionError) -> Self {
match e {
DataFusionError::ArrowError(e) => e,
DataFusionError::External(e) => ArrowError::ExternalError(e),
other => ArrowError::ExternalError(Box::new(other)),
}
}
}
#[cfg(feature = "parquet")]
impl From<ParquetError> for DataFusionError {
fn from(e: ParquetError) -> Self {
DataFusionError::ParquetError(e)
}
}
#[cfg(feature = "avro")]
impl From<AvroError> for DataFusionError {
fn from(e: AvroError) -> Self {
DataFusionError::AvroError(e)
}
}
#[cfg(feature = "object_store")]
impl From<object_store::Error> for DataFusionError {
fn from(e: object_store::Error) -> Self {
DataFusionError::ObjectStore(e)
}
}
#[cfg(feature = "object_store")]
impl From<object_store::path::Error> for DataFusionError {
fn from(e: object_store::path::Error) -> Self {
DataFusionError::ObjectStore(e.into())
}
}
impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
}
}
impl From<GenericError> for DataFusionError {
fn from(err: GenericError) -> Self {
DataFusionError::External(err)
}
}
impl Display for DataFusionError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match *self {
DataFusionError::ArrowError(ref desc) => {
write!(f, "Arrow error: {desc}")
}
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(ref desc) => {
write!(f, "Parquet error: {desc}")
}
#[cfg(feature = "avro")]
DataFusionError::AvroError(ref desc) => {
write!(f, "Avro error: {desc}")
}
DataFusionError::IoError(ref desc) => {
write!(f, "IO error: {desc}")
}
DataFusionError::SQL(ref desc) => {
write!(f, "SQL error: {desc:?}")
}
DataFusionError::Configuration(ref desc) => {
write!(f, "Invalid or Unsupported Configuration: {desc}")
}
DataFusionError::NotImplemented(ref desc) => {
write!(f, "This feature is not implemented: {desc}")
}
DataFusionError::Internal(ref desc) => {
write!(f, "Internal error: {desc}.\nThis was likely caused by a bug in DataFusion's \
code and we would welcome that you file an bug report in our issue tracker")
}
DataFusionError::Plan(ref desc) => {
write!(f, "Error during planning: {desc}")
}
DataFusionError::SchemaError(ref desc) => {
write!(f, "Schema error: {desc}")
}
DataFusionError::Execution(ref desc) => {
write!(f, "Execution error: {desc}")
}
DataFusionError::ResourcesExhausted(ref desc) => {
write!(f, "Resources exhausted: {desc}")
}
DataFusionError::External(ref desc) => {
write!(f, "External error: {desc}")
}
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(ref desc) => {
write!(f, "Object Store error: {desc}")
}
DataFusionError::Context(ref desc, ref err) => {
write!(f, "{}\ncaused by\n{}", desc, *err)
}
DataFusionError::Substrait(ref desc) => {
write!(f, "Substrait error: {desc}")
}
}
}
}
impl Error for DataFusionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
DataFusionError::ArrowError(e) => Some(e),
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(e) => Some(e),
#[cfg(feature = "avro")]
DataFusionError::AvroError(e) => Some(e),
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(e) => Some(e),
DataFusionError::IoError(e) => Some(e),
DataFusionError::SQL(e) => Some(e),
DataFusionError::NotImplemented(_) => None,
DataFusionError::Internal(_) => None,
DataFusionError::Configuration(_) => None,
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
DataFusionError::Context(_, e) => Some(e.as_ref()),
DataFusionError::Substrait(_) => None,
}
}
}
impl From<DataFusionError> for io::Error {
fn from(e: DataFusionError) -> Self {
io::Error::new(io::ErrorKind::Other, e)
}
}
impl DataFusionError {
pub fn find_root(&self) -> &Self {
let mut last_datafusion_error = self;
let mut root_error: &dyn Error = self;
while let Some(source) = root_error.source() {
root_error = source;
if let Some(e) = root_error.downcast_ref::<DataFusionError>() {
last_datafusion_error = e;
} else if let Some(e) = root_error.downcast_ref::<Arc<DataFusionError>>() {
last_datafusion_error = e.as_ref();
}
}
last_datafusion_error
}
pub fn context(self, description: impl Into<String>) -> Self {
Self::Context(description.into(), Box::new(self))
}
pub fn strip_backtrace(&self) -> String {
self.to_string()
.split("\n\nbacktrace: ")
.collect::<Vec<&str>>()
.first()
.unwrap_or(&"")
.to_string()
}
pub fn get_back_trace() -> String {
let back_trace = Backtrace::capture();
if back_trace.status() == BacktraceStatus::Captured {
return format!("\n\nbacktrace: {}", back_trace);
}
"".to_string()
}
}
#[macro_export]
macro_rules! unwrap_or_internal_err {
($Value: ident) => {
$Value.ok_or_else(|| {
DataFusionError::Internal(format!(
"{} should not be None",
stringify!($Value)
))
})?
};
}
macro_rules! with_dollar_sign {
($($body:tt)*) => {
macro_rules! __with_dollar_sign { $($body)* }
__with_dollar_sign!($);
}
}
macro_rules! make_error {
($NAME:ident, $ERR:ident) => {
with_dollar_sign! {
($d:tt) => {
#[macro_export]
macro_rules! $NAME {
($d($d args:expr),*) => {
Err(DataFusionError::$ERR(format!("{}{}", format!($d($d args),*), DataFusionError::get_back_trace()).into()))
}
}
}
}
};
}
make_error!(plan_err, Plan);
make_error!(internal_err, Internal);
make_error!(not_impl_err, NotImplemented);
make_error!(exec_err, Execution);
#[macro_export]
macro_rules! sql_err {
($ERR:expr) => {
Err(DataFusionError::SQL($ERR))
};
}
pub use exec_err as _exec_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
#[cfg(test)]
mod test {
use std::sync::Arc;
use crate::error::DataFusionError;
use arrow::error::ArrowError;
#[test]
fn datafusion_error_to_arrow() {
let res = return_arrow_error().unwrap_err();
assert!(res
.to_string()
.starts_with("External error: Error during planning: foo"));
}
#[test]
fn arrow_error_to_datafusion() {
let res = return_datafusion_error().unwrap_err();
assert_eq!(res.strip_backtrace(), "Arrow error: Schema error: bar");
}
#[test]
fn test_find_root_error() {
do_root_test(
DataFusionError::Context(
"it happened!".to_string(),
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::ArrowError(ArrowError::ExternalError(Box::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(DataFusionError::ResourcesExhausted(
"foo".to_string(),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(ArrowError::ExternalError(Box::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
)))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::ArrowError(ArrowError::ExternalError(Box::new(
ArrowError::ExternalError(Box::new(DataFusionError::ResourcesExhausted(
"foo".to_string(),
))),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(Arc::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(Arc::new(ArrowError::ExternalError(
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
)))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
}
#[test]
#[allow(clippy::unnecessary_literal_unwrap)]
fn test_make_error_parse_input() {
let res: Result<(), DataFusionError> = plan_err!("Err");
let res = res.unwrap_err();
assert_eq!(res.strip_backtrace(), "Error during planning: Err");
let extra1 = "extra1";
let extra2 = "extra2";
let res: Result<(), DataFusionError> = plan_err!("Err {} {}", extra1, extra2);
let res = res.unwrap_err();
assert_eq!(
res.strip_backtrace(),
"Error during planning: Err extra1 extra2"
);
let res: Result<(), DataFusionError> =
plan_err!("Err {:?} {:#?}", extra1, extra2);
let res = res.unwrap_err();
assert_eq!(
res.strip_backtrace(),
"Error during planning: Err \"extra1\" \"extra2\""
);
let res: Result<(), DataFusionError> = plan_err!("Err {extra1} {extra2}");
let res = res.unwrap_err();
assert_eq!(
res.strip_backtrace(),
"Error during planning: Err extra1 extra2"
);
let res: Result<(), DataFusionError> = plan_err!("Err {extra1:?} {extra2:#?}");
let res = res.unwrap_err();
assert_eq!(
res.strip_backtrace(),
"Error during planning: Err \"extra1\" \"extra2\""
);
}
fn return_arrow_error() -> arrow::error::Result<()> {
Err(DataFusionError::Plan("foo".to_string()).into())
}
fn return_datafusion_error() -> crate::error::Result<()> {
Err(ArrowError::SchemaError("bar".to_string()).into())
}
fn do_root_test(e: DataFusionError, exp: DataFusionError) {
let e = e.find_root();
assert_eq!(e.strip_backtrace(), exp.strip_backtrace());
assert_eq!(std::mem::discriminant(e), std::mem::discriminant(&exp),)
}
}