use std::collections::HashMap;
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::{not_impl_err, substrait_err};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use async_recursion::async_recursion;
use chrono::DateTime;
use object_store::ObjectMeta;
use substrait::proto::r#type::{Kind, Nullability};
use substrait::proto::read_rel::local_files::file_or_files::PathType;
use substrait::proto::Type;
use substrait::proto::{
expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel,
};
#[async_recursion]
pub async fn from_substrait_rel(
_ctx: &SessionContext,
rel: &Rel,
_extensions: &HashMap<u32, &String>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut base_config;
match &rel.rel_type {
Some(RelType::Read(read)) => {
if read.filter.is_some() || read.best_effort_filter.is_some() {
return not_impl_err!("Read with filter is not supported");
}
if read.advanced_extension.is_some() {
return not_impl_err!("Read with AdvancedExtension is not supported");
}
let Some(schema) = read.base_schema.as_ref() else {
return substrait_err!("Missing base schema in the read");
};
let Some(r#struct) = schema.r#struct.as_ref() else {
return substrait_err!("Missing struct in the schema");
};
match schema
.names
.iter()
.zip(r#struct.types.iter())
.map(|(name, r#type)| to_field(name, r#type))
.collect::<Result<Vec<Field>>>()
{
Ok(fields) => {
base_config = FileScanConfig::new(
ObjectStoreUrl::local_filesystem(),
Arc::new(Schema::new(fields)),
);
}
Err(e) => return Err(e),
};
match &read.as_ref().read_type {
Some(ReadType::LocalFiles(files)) => {
let mut file_groups = vec![];
for file in &files.items {
let path = if let Some(path_type) = &file.path_type {
match path_type {
PathType::UriPath(path) => Ok(path.clone()),
PathType::UriPathGlob(path) => Ok(path.clone()),
PathType::UriFile(path) => Ok(path.clone()),
PathType::UriFolder(path) => Ok(path.clone()),
}
} else {
Err(DataFusionError::Substrait(
"Missing PathType".to_string(),
))
}?;
let last_modified = DateTime::parse_from_str(
"1970 Jan 1 00:00:00.000 +0000",
"%Y %b %d %H:%M:%S%.3f %z",
)
.unwrap();
let size = 0;
let partitioned_file = PartitionedFile {
object_meta: ObjectMeta {
last_modified: last_modified.into(),
location: path.into(),
size,
e_tag: None,
version: None,
},
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
};
let part_index = file.partition_index as usize;
while part_index >= file_groups.len() {
file_groups.push(vec![]);
}
file_groups[part_index].push(partitioned_file)
}
base_config = base_config.with_file_groups(file_groups);
if let Some(MaskExpression { select, .. }) = &read.projection {
if let Some(projection) = &select.as_ref() {
let column_indices: Vec<usize> = projection
.struct_items
.iter()
.map(|item| item.field as usize)
.collect();
base_config.projection = Some(column_indices);
}
}
Ok(ParquetExec::builder(base_config).build_arc()
as Arc<dyn ExecutionPlan>)
}
_ => not_impl_err!(
"Only LocalFile reads are supported when parsing physical"
),
}
}
_ => not_impl_err!("Unsupported RelType: {:?}", rel.rel_type),
}
}
fn to_field(name: &String, r#type: &Type) -> Result<Field> {
let Some(kind) = r#type.kind.as_ref() else {
return substrait_err!("Missing kind in the type with name {}", name);
};
let mut nullable = false;
let data_type = match kind {
Kind::Bool(boolean) => {
nullable = is_nullable(boolean.nullability);
Ok(DataType::Boolean)
}
Kind::I64(i64) => {
nullable = is_nullable(i64.nullability);
Ok(DataType::Int64)
}
Kind::Fp64(fp64) => {
nullable = is_nullable(fp64.nullability);
Ok(DataType::Float64)
}
Kind::String(string) => {
nullable = is_nullable(string.nullability);
Ok(DataType::Utf8)
}
_ => substrait_err!(
"Unsupported kind: {:?} in the type with name {}",
kind,
name
),
}?;
Ok(Field::new(name, data_type, nullable))
}
fn is_nullable(nullability: i32) -> bool {
let Ok(nullability) = Nullability::try_from(nullability) else {
return true;
};
match nullability {
Nullability::Nullable | Nullability::Unspecified => true,
Nullability::Required => false,
}
}