Trait SubstraitConsumer

Source
pub trait SubstraitConsumer:
    Send
    + Sync
    + Sized {
Show 33 methods // Required methods fn resolve_table_ref<'life0, 'life1, 'async_trait>( &'life0 self, table_ref: &'life1 TableReference, ) -> Pin<Box<dyn Future<Output = Result<Option<Arc<dyn TableProvider>>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn get_extensions(&self) -> &Extensions; fn get_function_registry(&self) -> &impl FunctionRegistry; // Provided methods fn consume_rel<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 Rel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_read<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ReadRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_filter<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 FilterRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_fetch<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 FetchRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_aggregate<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 AggregateRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_sort<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 SortRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_join<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 JoinRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_project<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ProjectRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_set<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 SetRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_cross<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 CrossRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_consistent_partition_window<'life0, 'life1, 'async_trait>( &'life0 self, _rel: &'life1 ConsistentPartitionWindowRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_exchange<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ExchangeRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_expression<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 Expression, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_literal<'life0, 'life1, 'async_trait>( &'life0 self, expr: &'life1 Literal, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_field_reference<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 FieldReference, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_scalar_function<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 ScalarFunction, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_window_function<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 WindowFunction, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_if_then<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 IfThen, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_switch<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _expr: &'life1 SwitchExpression, _input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_singular_or_list<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 SingularOrList, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_multi_or_list<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _expr: &'life1 MultiOrList, _input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_cast<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 Cast, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_subquery<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 Subquery, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_nested<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _expr: &'life1 Nested, _input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_enum<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _expr: &'life1 Enum, _input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn consume_extension_leaf<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ExtensionLeafRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_extension_single<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ExtensionSingleRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_extension_multi<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ExtensionMultiRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn consume_user_defined_type( &self, user_defined_type: &UserDefined, ) -> Result<DataType> { ... } fn consume_user_defined_literal( &self, user_defined_literal: &UserDefined, ) -> Result<ScalarValue> { ... }
}
Expand description

This trait is used to consume Substrait plans, converting them into DataFusion Logical Plans. It can be implemented by users to allow for custom handling of relations, expressions, etc.

Combined with the crate::logical_plan::producer::SubstraitProducer this allows for fully customizable Substrait serde.

§Example Usage


struct CustomSubstraitConsumer {
    extensions: Arc<Extensions>,
    state: Arc<SessionState>,
}

#[async_trait]
impl SubstraitConsumer for CustomSubstraitConsumer {
    async fn resolve_table_ref(
        &self,
        table_ref: &TableReference,
    ) -> Result<Option<Arc<dyn TableProvider>>> {
        let table = table_ref.table().to_string();
        let schema = self.state.schema_for_ref(table_ref.clone())?;
        let table_provider = schema.table(&table).await?;
        Ok(table_provider)
    }

    fn get_extensions(&self) -> &Extensions {
        self.extensions.as_ref()
    }

    fn get_function_registry(&self) -> &impl FunctionRegistry {
        self.state.as_ref()
    }

    // You can reuse existing consumer code to assist in handling advanced extensions
    async fn consume_project(&self, rel: &ProjectRel) -> Result<LogicalPlan> {
        let df_plan = from_project_rel(self, rel).await?;
        if let Some(advanced_extension) = rel.advanced_extension.as_ref() {
            not_impl_err!(
                "decode and handle an advanced extension: {:?}",
                advanced_extension
            )
        } else {
            Ok(df_plan)
        }
    }

    // You can implement a fully custom consumer method if you need special handling
    async fn consume_filter(&self, rel: &FilterRel) -> Result<LogicalPlan> {
        let input = self.consume_rel(rel.input.as_ref().unwrap()).await?;
        let expression =
            self.consume_expression(rel.condition.as_ref().unwrap(), input.schema())
                .await?;
        // though this one is quite boring
        LogicalPlanBuilder::from(input).filter(expression)?.build()
    }

    // You can add handlers for extension relations
    async fn consume_extension_leaf(
        &self,
        rel: &ExtensionLeafRel,
    ) -> Result<LogicalPlan> {
        not_impl_err!(
            "handle protobuf Any {} as you need",
            rel.detail.as_ref().unwrap().type_url
        )
    }

    // and handlers for user-define types
    fn consume_user_defined_type(&self, typ: &proto::r#type::UserDefined) -> Result<DataType> {
        let type_string = self.extensions.types.get(&typ.type_reference).unwrap();
        match type_string.as_str() {
            "u!foo" => not_impl_err!("handle foo conversion"),
            "u!bar" => not_impl_err!("handle bar conversion"),
            _ => substrait_err!("unexpected type")
        }
    }

    // and user-defined literals
    fn consume_user_defined_literal(&self, literal: &proto::expression::literal::UserDefined) -> Result<ScalarValue> {
        let type_string = self.extensions.types.get(&literal.type_reference).unwrap();
        match type_string.as_str() {
            "u!foo" => not_impl_err!("handle foo conversion"),
            "u!bar" => not_impl_err!("handle bar conversion"),
            _ => substrait_err!("unexpected type")
        }
    }
}

Required Methods§

Source

fn resolve_table_ref<'life0, 'life1, 'async_trait>( &'life0 self, table_ref: &'life1 TableReference, ) -> Pin<Box<dyn Future<Output = Result<Option<Arc<dyn TableProvider>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn get_extensions(&self) -> &Extensions

Source

fn get_function_registry(&self) -> &impl FunctionRegistry

Provided Methods§

Source

fn consume_rel<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 Rel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

All Rels to be converted pass through this method. You can provide your own implementation if you wish to customize the conversion behaviour.

Source

fn consume_read<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ReadRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_filter<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 FilterRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_fetch<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 FetchRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_aggregate<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 AggregateRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_sort<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 SortRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_join<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 JoinRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_project<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ProjectRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_set<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 SetRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_cross<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 CrossRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_consistent_partition_window<'life0, 'life1, 'async_trait>( &'life0 self, _rel: &'life1 ConsistentPartitionWindowRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_exchange<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ExchangeRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_expression<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 Expression, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

All Expressions to be converted pass through this method. You can provide your own implementation if you wish to customize the conversion behaviour.

Source

fn consume_literal<'life0, 'life1, 'async_trait>( &'life0 self, expr: &'life1 Literal, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_field_reference<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 FieldReference, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_scalar_function<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 ScalarFunction, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_window_function<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 WindowFunction, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_if_then<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 IfThen, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_switch<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _expr: &'life1 SwitchExpression, _input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_singular_or_list<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 SingularOrList, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_multi_or_list<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _expr: &'life1 MultiOrList, _input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_cast<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 Cast, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_subquery<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, expr: &'life1 Subquery, input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_nested<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _expr: &'life1 Nested, _input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_enum<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _expr: &'life1 Enum, _input_schema: &'life2 DFSchema, ) -> Pin<Box<dyn Future<Output = Result<Expr>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source

fn consume_extension_leaf<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ExtensionLeafRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_extension_single<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ExtensionSingleRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_extension_multi<'life0, 'life1, 'async_trait>( &'life0 self, rel: &'life1 ExtensionMultiRel, ) -> Pin<Box<dyn Future<Output = Result<LogicalPlan>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source

fn consume_user_defined_type( &self, user_defined_type: &UserDefined, ) -> Result<DataType>

Source

fn consume_user_defined_literal( &self, user_defined_literal: &UserDefined, ) -> Result<ScalarValue>

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§