datafusion_optimizer/analyzer/
function_rewrite.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ApplyFunctionRewrites`] to replace `Expr`s with function calls (e.g `||` to array_concat`)
19
20use super::AnalyzerRule;
21use datafusion_common::config::ConfigOptions;
22use datafusion_common::tree_node::{Transformed, TreeNode};
23use datafusion_common::{DFSchema, Result};
24
25use crate::utils::NamePreserver;
26use datafusion_expr::expr_rewriter::FunctionRewrite;
27use datafusion_expr::utils::merge_schema;
28use datafusion_expr::LogicalPlan;
29use std::sync::Arc;
30
31/// Analyzer rule that invokes [`FunctionRewrite`]s on expressions
32#[derive(Default, Debug)]
33pub struct ApplyFunctionRewrites {
34    /// Expr --> Function writes to apply
35    function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
36}
37
38impl ApplyFunctionRewrites {
39    pub fn new(function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>) -> Self {
40        Self { function_rewrites }
41    }
42
43    /// Rewrite a single plan, and all its expressions using the provided rewriters
44    fn rewrite_plan(
45        &self,
46        plan: LogicalPlan,
47        options: &ConfigOptions,
48    ) -> Result<Transformed<LogicalPlan>> {
49        // get schema representing all available input fields. This is used for data type
50        // resolution only, so order does not matter here
51        let mut schema = merge_schema(&plan.inputs());
52
53        if let LogicalPlan::TableScan(ts) = &plan {
54            let source_schema = DFSchema::try_from_qualified_schema(
55                ts.table_name.clone(),
56                &ts.source.schema(),
57            )?;
58            schema.merge(&source_schema);
59        }
60
61        let name_preserver = NamePreserver::new(&plan);
62
63        plan.map_expressions(|expr| {
64            let original_name = name_preserver.save(&expr);
65
66            // recursively transform the expression, applying the rewrites at each step
67            let transformed_expr = expr.transform_up(|expr| {
68                let mut result = Transformed::no(expr);
69                for rewriter in self.function_rewrites.iter() {
70                    result = result.transform_data(|expr| {
71                        rewriter.rewrite(expr, &schema, options)
72                    })?;
73                }
74                Ok(result)
75            })?;
76
77            Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
78        })
79    }
80}
81
82impl AnalyzerRule for ApplyFunctionRewrites {
83    fn name(&self) -> &str {
84        "apply_function_rewrites"
85    }
86
87    fn analyze(&self, plan: LogicalPlan, options: &ConfigOptions) -> Result<LogicalPlan> {
88        plan.transform_up_with_subqueries(|plan| self.rewrite_plan(plan, options))
89            .map(|res| res.data)
90    }
91}