deltalake_core/delta_datafusion/
planner.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//! Custom planners for datafusion so that you can convert custom nodes, can be used
//! to trace custom metrics in an operation
//!
//! # Example
//!
//! #[derive(Clone)]
//! struct MergeMetricExtensionPlanner {}
//!
//! #[async_trait]
//! impl ExtensionPlanner for MergeMetricExtensionPlanner {
//!     async fn plan_extension(
//!         &self,
//!         planner: &dyn PhysicalPlanner,
//!         node: &dyn UserDefinedLogicalNode,
//!         _logical_inputs: &[&LogicalPlan],
//!         physical_inputs: &[Arc<dyn ExecutionPlan>],
//!         session_state: &SessionState,
//!     ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {}
//!
//! let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> {
//!     extension_planner: MergeMetricExtensionPlanner {}
//! };
//!
//! let state = state.with_query_planner(Arc::new(merge_planner));
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::physical_planner::PhysicalPlanner;
use datafusion::{
    execution::{context::QueryPlanner, session_state::SessionState},
    physical_plan::ExecutionPlan,
    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner},
};
use datafusion_expr::LogicalPlan;

use crate::delta_datafusion::DataFusionResult;

/// Deltaplanner
#[derive(Debug)]
pub struct DeltaPlanner<T: ExtensionPlanner> {
    /// custom extension planner
    pub extension_planner: T,
}

#[async_trait]
impl<T: ExtensionPlanner + Send + Sync + 'static + Clone + std::fmt::Debug> QueryPlanner
    for DeltaPlanner<T>
{
    async fn create_physical_plan(
        &self,
        logical_plan: &LogicalPlan,
        session_state: &SessionState,
    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
        let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
            vec![Arc::new(self.extension_planner.clone())],
        )));
        planner
            .create_physical_plan(logical_plan, session_state)
            .await
    }
}