datafusion_expr/logical_plan/
tree_node.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//!  [`TreeNode`] based visiting and rewriting for [`LogicalPlan`]s
19//!
20//! Visiting (read only) APIs
21//! * [`LogicalPlan::visit`]: recursively visit the node and all of its inputs
22//! * [`LogicalPlan::visit_with_subqueries`]: recursively visit the node and all of its inputs, including subqueries
23//! * [`LogicalPlan::apply_children`]: recursively visit all inputs of this node
24//! * [`LogicalPlan::apply_expressions`]: (non recursively) visit all expressions of this node
25//! * [`LogicalPlan::apply_subqueries`]: (non recursively) visit all subqueries of this node
26//! * [`LogicalPlan::apply_with_subqueries`]: recursively visit all inputs and embedded subqueries.
27//!
28//! Rewriting (update) APIs:
29//! * [`LogicalPlan::exists`]: search for an expression in a plan
30//! * [`LogicalPlan::rewrite`]: recursively rewrite the node and all of its inputs
31//! * [`LogicalPlan::map_children`]: recursively rewrite all inputs of this node
32//! * [`LogicalPlan::map_expressions`]: (non recursively) visit all expressions of this node
33//! * [`LogicalPlan::map_subqueries`]: (non recursively) rewrite all subqueries of this node
34//! * [`LogicalPlan::rewrite_with_subqueries`]: recursively rewrite the node and all of its inputs, including subqueries
35//!
36//! (Re)creation APIs (these require substantial cloning and thus are slow):
37//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
38//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
39
40use crate::{
41    dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
42    Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
43    Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
44    Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest,
45    UserDefinedLogicalNode, Values, Window,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery};
50use datafusion_common::tree_node::{
51    Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52    TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{internal_err, Result};
55
56impl TreeNode for LogicalPlan {
57    fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58        &'n self,
59        f: F,
60    ) -> Result<TreeNodeRecursion> {
61        self.inputs().apply_ref_elements(f)
62    }
63
64    /// Applies `f` to each child (input) of this plan node, rewriting them *in place.*
65    ///
66    /// # Notes
67    ///
68    /// Inputs include ONLY direct children, not embedded `LogicalPlan`s for
69    /// subqueries, for example such as are in [`Expr::Exists`].
70    ///
71    /// [`Expr::Exists`]: crate::Expr::Exists
72    fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73        self,
74        f: F,
75    ) -> Result<Transformed<Self>> {
76        Ok(match self {
77            LogicalPlan::Projection(Projection {
78                expr,
79                input,
80                schema,
81            }) => input.map_elements(f)?.update_data(|input| {
82                LogicalPlan::Projection(Projection {
83                    expr,
84                    input,
85                    schema,
86                })
87            }),
88            LogicalPlan::Filter(Filter {
89                predicate,
90                input,
91                having,
92            }) => input.map_elements(f)?.update_data(|input| {
93                LogicalPlan::Filter(Filter {
94                    predicate,
95                    input,
96                    having,
97                })
98            }),
99            LogicalPlan::Repartition(Repartition {
100                input,
101                partitioning_scheme,
102            }) => input.map_elements(f)?.update_data(|input| {
103                LogicalPlan::Repartition(Repartition {
104                    input,
105                    partitioning_scheme,
106                })
107            }),
108            LogicalPlan::Window(Window {
109                input,
110                window_expr,
111                schema,
112            }) => input.map_elements(f)?.update_data(|input| {
113                LogicalPlan::Window(Window {
114                    input,
115                    window_expr,
116                    schema,
117                })
118            }),
119            LogicalPlan::Aggregate(Aggregate {
120                input,
121                group_expr,
122                aggr_expr,
123                schema,
124            }) => input.map_elements(f)?.update_data(|input| {
125                LogicalPlan::Aggregate(Aggregate {
126                    input,
127                    group_expr,
128                    aggr_expr,
129                    schema,
130                })
131            }),
132            LogicalPlan::Sort(Sort { expr, input, fetch }) => input
133                .map_elements(f)?
134                .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
135            LogicalPlan::Join(Join {
136                left,
137                right,
138                on,
139                filter,
140                join_type,
141                join_constraint,
142                schema,
143                null_equals_null,
144            }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
145                LogicalPlan::Join(Join {
146                    left,
147                    right,
148                    on,
149                    filter,
150                    join_type,
151                    join_constraint,
152                    schema,
153                    null_equals_null,
154                })
155            }),
156            LogicalPlan::Limit(Limit { skip, fetch, input }) => input
157                .map_elements(f)?
158                .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
159            LogicalPlan::Subquery(Subquery {
160                subquery,
161                outer_ref_columns,
162            }) => subquery.map_elements(f)?.update_data(|subquery| {
163                LogicalPlan::Subquery(Subquery {
164                    subquery,
165                    outer_ref_columns,
166                })
167            }),
168            LogicalPlan::SubqueryAlias(SubqueryAlias {
169                input,
170                alias,
171                schema,
172            }) => input.map_elements(f)?.update_data(|input| {
173                LogicalPlan::SubqueryAlias(SubqueryAlias {
174                    input,
175                    alias,
176                    schema,
177                })
178            }),
179            LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
180                .update_data(LogicalPlan::Extension),
181            LogicalPlan::Union(Union { inputs, schema }) => inputs
182                .map_elements(f)?
183                .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
184            LogicalPlan::Distinct(distinct) => match distinct {
185                Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
186                Distinct::On(DistinctOn {
187                    on_expr,
188                    select_expr,
189                    sort_expr,
190                    input,
191                    schema,
192                }) => input.map_elements(f)?.update_data(|input| {
193                    Distinct::On(DistinctOn {
194                        on_expr,
195                        select_expr,
196                        sort_expr,
197                        input,
198                        schema,
199                    })
200                }),
201            }
202            .update_data(LogicalPlan::Distinct),
203            LogicalPlan::Explain(Explain {
204                verbose,
205                plan,
206                stringified_plans,
207                schema,
208                logical_optimization_succeeded,
209            }) => plan.map_elements(f)?.update_data(|plan| {
210                LogicalPlan::Explain(Explain {
211                    verbose,
212                    plan,
213                    stringified_plans,
214                    schema,
215                    logical_optimization_succeeded,
216                })
217            }),
218            LogicalPlan::Analyze(Analyze {
219                verbose,
220                input,
221                schema,
222            }) => input.map_elements(f)?.update_data(|input| {
223                LogicalPlan::Analyze(Analyze {
224                    verbose,
225                    input,
226                    schema,
227                })
228            }),
229            LogicalPlan::Dml(DmlStatement {
230                table_name,
231                target,
232                op,
233                input,
234                output_schema,
235            }) => input.map_elements(f)?.update_data(|input| {
236                LogicalPlan::Dml(DmlStatement {
237                    table_name,
238                    target,
239                    op,
240                    input,
241                    output_schema,
242                })
243            }),
244            LogicalPlan::Copy(CopyTo {
245                input,
246                output_url,
247                partition_by,
248                file_type,
249                options,
250            }) => input.map_elements(f)?.update_data(|input| {
251                LogicalPlan::Copy(CopyTo {
252                    input,
253                    output_url,
254                    partition_by,
255                    file_type,
256                    options,
257                })
258            }),
259            LogicalPlan::Ddl(ddl) => {
260                match ddl {
261                    DdlStatement::CreateMemoryTable(CreateMemoryTable {
262                        name,
263                        constraints,
264                        input,
265                        if_not_exists,
266                        or_replace,
267                        column_defaults,
268                        temporary,
269                    }) => input.map_elements(f)?.update_data(|input| {
270                        DdlStatement::CreateMemoryTable(CreateMemoryTable {
271                            name,
272                            constraints,
273                            input,
274                            if_not_exists,
275                            or_replace,
276                            column_defaults,
277                            temporary,
278                        })
279                    }),
280                    DdlStatement::CreateView(CreateView {
281                        name,
282                        input,
283                        or_replace,
284                        definition,
285                        temporary,
286                    }) => input.map_elements(f)?.update_data(|input| {
287                        DdlStatement::CreateView(CreateView {
288                            name,
289                            input,
290                            or_replace,
291                            definition,
292                            temporary,
293                        })
294                    }),
295                    // no inputs in these statements
296                    DdlStatement::CreateExternalTable(_)
297                    | DdlStatement::CreateCatalogSchema(_)
298                    | DdlStatement::CreateCatalog(_)
299                    | DdlStatement::CreateIndex(_)
300                    | DdlStatement::DropTable(_)
301                    | DdlStatement::DropView(_)
302                    | DdlStatement::DropCatalogSchema(_)
303                    | DdlStatement::CreateFunction(_)
304                    | DdlStatement::DropFunction(_) => Transformed::no(ddl),
305                }
306                .update_data(LogicalPlan::Ddl)
307            }
308            LogicalPlan::Unnest(Unnest {
309                input,
310                exec_columns: input_columns,
311                list_type_columns,
312                struct_type_columns,
313                dependency_indices,
314                schema,
315                options,
316            }) => input.map_elements(f)?.update_data(|input| {
317                LogicalPlan::Unnest(Unnest {
318                    input,
319                    exec_columns: input_columns,
320                    dependency_indices,
321                    list_type_columns,
322                    struct_type_columns,
323                    schema,
324                    options,
325                })
326            }),
327            LogicalPlan::RecursiveQuery(RecursiveQuery {
328                name,
329                static_term,
330                recursive_term,
331                is_distinct,
332            }) => (static_term, recursive_term).map_elements(f)?.update_data(
333                |(static_term, recursive_term)| {
334                    LogicalPlan::RecursiveQuery(RecursiveQuery {
335                        name,
336                        static_term,
337                        recursive_term,
338                        is_distinct,
339                    })
340                },
341            ),
342            LogicalPlan::Statement(stmt) => match stmt {
343                Statement::Prepare(p) => p
344                    .input
345                    .map_elements(f)?
346                    .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
347                _ => Transformed::no(stmt),
348            }
349            .update_data(LogicalPlan::Statement),
350            // plans without inputs
351            LogicalPlan::TableScan { .. }
352            | LogicalPlan::EmptyRelation { .. }
353            | LogicalPlan::Values { .. }
354            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
355        })
356    }
357}
358
359/// Rewrites all inputs for an Extension node "in place"
360/// (it currently has to copy values because there are no APIs for in place modification)
361///
362/// Should be removed when we have an API for in place modifications of the
363/// extension to avoid these copies
364fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
365    extension: Extension,
366    f: F,
367) -> Result<Transformed<Extension>> {
368    let Extension { node } = extension;
369
370    node.inputs()
371        .into_iter()
372        .cloned()
373        .map_until_stop_and_collect(f)?
374        .map_data(|new_inputs| {
375            let exprs = node.expressions();
376            Ok(Extension {
377                node: node.with_exprs_and_inputs(exprs, new_inputs)?,
378            })
379        })
380}
381
382/// This macro is used to determine continuation during combined transforming
383/// traversals.
384macro_rules! handle_transform_recursion {
385    ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
386        $F_DOWN?
387            .transform_children(|n| {
388                n.map_subqueries($F_CHILD)?
389                    .transform_sibling(|n| n.map_children($F_CHILD))
390            })?
391            .transform_parent($F_UP)
392    }};
393}
394
395impl LogicalPlan {
396    /// Calls `f` on all expressions in the current `LogicalPlan` node.
397    ///
398    /// # Notes
399    /// * Similar to [`TreeNode::apply`] but for this node's expressions.
400    /// * Does not include expressions in input `LogicalPlan` nodes
401    /// * Visits only the top level expressions (Does not recurse into each expression)
402    pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
403        &self,
404        mut f: F,
405    ) -> Result<TreeNodeRecursion> {
406        match self {
407            LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
408            LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
409            LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
410            LogicalPlan::Repartition(Repartition {
411                partitioning_scheme,
412                ..
413            }) => match partitioning_scheme {
414                Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
415                    expr.apply_elements(f)
416                }
417                Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
418            },
419            LogicalPlan::Window(Window { window_expr, .. }) => {
420                window_expr.apply_elements(f)
421            }
422            LogicalPlan::Aggregate(Aggregate {
423                group_expr,
424                aggr_expr,
425                ..
426            }) => (group_expr, aggr_expr).apply_ref_elements(f),
427            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
428            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
429            // 2. the second part is non-equijoin(filter).
430            LogicalPlan::Join(Join { on, filter, .. }) => {
431                (on, filter).apply_ref_elements(f)
432            }
433            LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
434            LogicalPlan::Extension(extension) => {
435                // would be nice to avoid this copy -- maybe can
436                // update extension to just observer Exprs
437                extension.node.expressions().apply_elements(f)
438            }
439            LogicalPlan::TableScan(TableScan { filters, .. }) => {
440                filters.apply_elements(f)
441            }
442            LogicalPlan::Unnest(unnest) => {
443                let columns = unnest.exec_columns.clone();
444
445                let exprs = columns
446                    .iter()
447                    .map(|c| Expr::Column(c.clone()))
448                    .collect::<Vec<_>>();
449                exprs.apply_elements(f)
450            }
451            LogicalPlan::Distinct(Distinct::On(DistinctOn {
452                on_expr,
453                select_expr,
454                sort_expr,
455                ..
456            })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
457            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
458                (skip, fetch).apply_ref_elements(f)
459            }
460            LogicalPlan::Statement(stmt) => match stmt {
461                Statement::Execute(Execute { parameters, .. }) => {
462                    parameters.apply_elements(f)
463                }
464                _ => Ok(TreeNodeRecursion::Continue),
465            },
466            // plans without expressions
467            LogicalPlan::EmptyRelation(_)
468            | LogicalPlan::RecursiveQuery(_)
469            | LogicalPlan::Subquery(_)
470            | LogicalPlan::SubqueryAlias(_)
471            | LogicalPlan::Analyze(_)
472            | LogicalPlan::Explain(_)
473            | LogicalPlan::Union(_)
474            | LogicalPlan::Distinct(Distinct::All(_))
475            | LogicalPlan::Dml(_)
476            | LogicalPlan::Ddl(_)
477            | LogicalPlan::Copy(_)
478            | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
479        }
480    }
481
482    /// Rewrites all expressions in the current `LogicalPlan` node using `f`.
483    ///
484    /// Returns the current node.
485    ///
486    /// # Notes
487    /// * Similar to [`TreeNode::map_children`] but for this node's expressions.
488    /// * Visits only the top level expressions (Does not recurse into each expression)
489    pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
490        self,
491        mut f: F,
492    ) -> Result<Transformed<Self>> {
493        Ok(match self {
494            LogicalPlan::Projection(Projection {
495                expr,
496                input,
497                schema,
498            }) => expr.map_elements(f)?.update_data(|expr| {
499                LogicalPlan::Projection(Projection {
500                    expr,
501                    input,
502                    schema,
503                })
504            }),
505            LogicalPlan::Values(Values { schema, values }) => values
506                .map_elements(f)?
507                .update_data(|values| LogicalPlan::Values(Values { schema, values })),
508            LogicalPlan::Filter(Filter {
509                predicate,
510                input,
511                having,
512            }) => f(predicate)?.update_data(|predicate| {
513                LogicalPlan::Filter(Filter {
514                    predicate,
515                    input,
516                    having,
517                })
518            }),
519            LogicalPlan::Repartition(Repartition {
520                input,
521                partitioning_scheme,
522            }) => match partitioning_scheme {
523                Partitioning::Hash(expr, usize) => expr
524                    .map_elements(f)?
525                    .update_data(|expr| Partitioning::Hash(expr, usize)),
526                Partitioning::DistributeBy(expr) => expr
527                    .map_elements(f)?
528                    .update_data(Partitioning::DistributeBy),
529                Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
530            }
531            .update_data(|partitioning_scheme| {
532                LogicalPlan::Repartition(Repartition {
533                    input,
534                    partitioning_scheme,
535                })
536            }),
537            LogicalPlan::Window(Window {
538                input,
539                window_expr,
540                schema,
541            }) => window_expr.map_elements(f)?.update_data(|window_expr| {
542                LogicalPlan::Window(Window {
543                    input,
544                    window_expr,
545                    schema,
546                })
547            }),
548            LogicalPlan::Aggregate(Aggregate {
549                input,
550                group_expr,
551                aggr_expr,
552                schema,
553            }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
554                |(group_expr, aggr_expr)| {
555                    LogicalPlan::Aggregate(Aggregate {
556                        input,
557                        group_expr,
558                        aggr_expr,
559                        schema,
560                    })
561                },
562            ),
563
564            // There are two part of expression for join, equijoin(on) and non-equijoin(filter).
565            // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
566            // 2. the second part is non-equijoin(filter).
567            LogicalPlan::Join(Join {
568                left,
569                right,
570                on,
571                filter,
572                join_type,
573                join_constraint,
574                schema,
575                null_equals_null,
576            }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
577                LogicalPlan::Join(Join {
578                    left,
579                    right,
580                    on,
581                    filter,
582                    join_type,
583                    join_constraint,
584                    schema,
585                    null_equals_null,
586                })
587            }),
588            LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
589                .map_elements(f)?
590                .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
591            LogicalPlan::Extension(Extension { node }) => {
592                // would be nice to avoid this copy -- maybe can
593                // update extension to just observer Exprs
594                let exprs = node.expressions().map_elements(f)?;
595                let plan = LogicalPlan::Extension(Extension {
596                    node: UserDefinedLogicalNode::with_exprs_and_inputs(
597                        node.as_ref(),
598                        exprs.data,
599                        node.inputs().into_iter().cloned().collect::<Vec<_>>(),
600                    )?,
601                });
602                Transformed::new(plan, exprs.transformed, exprs.tnr)
603            }
604            LogicalPlan::TableScan(TableScan {
605                table_name,
606                source,
607                projection,
608                projected_schema,
609                filters,
610                fetch,
611            }) => filters.map_elements(f)?.update_data(|filters| {
612                LogicalPlan::TableScan(TableScan {
613                    table_name,
614                    source,
615                    projection,
616                    projected_schema,
617                    filters,
618                    fetch,
619                })
620            }),
621            LogicalPlan::Distinct(Distinct::On(DistinctOn {
622                on_expr,
623                select_expr,
624                sort_expr,
625                input,
626                schema,
627            })) => (on_expr, select_expr, sort_expr)
628                .map_elements(f)?
629                .update_data(|(on_expr, select_expr, sort_expr)| {
630                    LogicalPlan::Distinct(Distinct::On(DistinctOn {
631                        on_expr,
632                        select_expr,
633                        sort_expr,
634                        input,
635                        schema,
636                    }))
637                }),
638            LogicalPlan::Limit(Limit { skip, fetch, input }) => {
639                (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
640                    LogicalPlan::Limit(Limit { skip, fetch, input })
641                })
642            }
643            LogicalPlan::Statement(stmt) => match stmt {
644                Statement::Execute(e) => {
645                    e.parameters.map_elements(f)?.update_data(|parameters| {
646                        Statement::Execute(Execute { parameters, ..e })
647                    })
648                }
649                _ => Transformed::no(stmt),
650            }
651            .update_data(LogicalPlan::Statement),
652            // plans without expressions
653            LogicalPlan::EmptyRelation(_)
654            | LogicalPlan::Unnest(_)
655            | LogicalPlan::RecursiveQuery(_)
656            | LogicalPlan::Subquery(_)
657            | LogicalPlan::SubqueryAlias(_)
658            | LogicalPlan::Analyze(_)
659            | LogicalPlan::Explain(_)
660            | LogicalPlan::Union(_)
661            | LogicalPlan::Distinct(Distinct::All(_))
662            | LogicalPlan::Dml(_)
663            | LogicalPlan::Ddl(_)
664            | LogicalPlan::Copy(_)
665            | LogicalPlan::DescribeTable(_) => Transformed::no(self),
666        })
667    }
668
669    /// Visits a plan similarly to [`Self::visit`], including subqueries that
670    /// may appear in expressions such as `IN (SELECT ...)`.
671    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
672    pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
673        &self,
674        visitor: &mut V,
675    ) -> Result<TreeNodeRecursion> {
676        visitor
677            .f_down(self)?
678            .visit_children(|| {
679                self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
680                    .visit_sibling(|| {
681                        self.apply_children(|c| c.visit_with_subqueries(visitor))
682                    })
683            })?
684            .visit_parent(|| visitor.f_up(self))
685    }
686
687    /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
688    /// including subqueries that may appear in expressions such as `IN (SELECT
689    /// ...)`.
690    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
691    pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
692        self,
693        rewriter: &mut R,
694    ) -> Result<Transformed<Self>> {
695        handle_transform_recursion!(
696            rewriter.f_down(self),
697            |c| c.rewrite_with_subqueries(rewriter),
698            |n| rewriter.f_up(n)
699        )
700    }
701
702    /// Similarly to [`Self::apply`], calls `f` on this node and all its inputs,
703    /// including subqueries that may appear in expressions such as `IN (SELECT
704    /// ...)`.
705    pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
706        &self,
707        mut f: F,
708    ) -> Result<TreeNodeRecursion> {
709        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
710        fn apply_with_subqueries_impl<
711            F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
712        >(
713            node: &LogicalPlan,
714            f: &mut F,
715        ) -> Result<TreeNodeRecursion> {
716            f(node)?.visit_children(|| {
717                node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
718                    .visit_sibling(|| {
719                        node.apply_children(|c| apply_with_subqueries_impl(c, f))
720                    })
721            })
722        }
723
724        apply_with_subqueries_impl(self, &mut f)
725    }
726
727    /// Similarly to [`Self::transform`], rewrites this node and its inputs using `f`,
728    /// including subqueries that may appear in expressions such as `IN (SELECT
729    /// ...)`.
730    pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
731        self,
732        f: F,
733    ) -> Result<Transformed<Self>> {
734        self.transform_up_with_subqueries(f)
735    }
736
737    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
738    /// including subqueries that may appear in expressions such as `IN (SELECT
739    /// ...)`.
740    pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
741        self,
742        mut f: F,
743    ) -> Result<Transformed<Self>> {
744        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
745        fn transform_down_with_subqueries_impl<
746            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
747        >(
748            node: LogicalPlan,
749            f: &mut F,
750        ) -> Result<Transformed<LogicalPlan>> {
751            f(node)?.transform_children(|n| {
752                n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
753                    .transform_sibling(|n| {
754                        n.map_children(|c| transform_down_with_subqueries_impl(c, f))
755                    })
756            })
757        }
758
759        transform_down_with_subqueries_impl(self, &mut f)
760    }
761
762    /// Similarly to [`Self::transform_up`], rewrites this node and its inputs using `f`,
763    /// including subqueries that may appear in expressions such as `IN (SELECT
764    /// ...)`.
765    pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
766        self,
767        mut f: F,
768    ) -> Result<Transformed<Self>> {
769        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
770        fn transform_up_with_subqueries_impl<
771            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
772        >(
773            node: LogicalPlan,
774            f: &mut F,
775        ) -> Result<Transformed<LogicalPlan>> {
776            node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
777                .transform_sibling(|n| {
778                    n.map_children(|c| transform_up_with_subqueries_impl(c, f))
779                })?
780                .transform_parent(f)
781        }
782
783        transform_up_with_subqueries_impl(self, &mut f)
784    }
785
786    /// Similarly to [`Self::transform_down`], rewrites this node and its inputs using `f`,
787    /// including subqueries that may appear in expressions such as `IN (SELECT
788    /// ...)`.
789    pub fn transform_down_up_with_subqueries<
790        FD: FnMut(Self) -> Result<Transformed<Self>>,
791        FU: FnMut(Self) -> Result<Transformed<Self>>,
792    >(
793        self,
794        mut f_down: FD,
795        mut f_up: FU,
796    ) -> Result<Transformed<Self>> {
797        #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
798        fn transform_down_up_with_subqueries_impl<
799            FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
800            FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
801        >(
802            node: LogicalPlan,
803            f_down: &mut FD,
804            f_up: &mut FU,
805        ) -> Result<Transformed<LogicalPlan>> {
806            handle_transform_recursion!(
807                f_down(node),
808                |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
809                f_up
810            )
811        }
812
813        transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
814    }
815
816    /// Similarly to [`Self::apply`], calls `f` on  this node and its inputs
817    /// including subqueries that may appear in expressions such as `IN (SELECT
818    /// ...)`.
819    pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
820        &self,
821        mut f: F,
822    ) -> Result<TreeNodeRecursion> {
823        self.apply_expressions(|expr| {
824            expr.apply(|expr| match expr {
825                Expr::Exists(Exists { subquery, .. })
826                | Expr::InSubquery(InSubquery { subquery, .. })
827                | Expr::ScalarSubquery(subquery) => {
828                    // use a synthetic plan so the collector sees a
829                    // LogicalPlan::Subquery (even though it is
830                    // actually a Subquery alias)
831                    f(&LogicalPlan::Subquery(subquery.clone()))
832                }
833                _ => Ok(TreeNodeRecursion::Continue),
834            })
835        })
836    }
837
838    /// Similarly to [`Self::map_children`], rewrites all subqueries that may
839    /// appear in expressions such as `IN (SELECT ...)` using `f`.
840    ///
841    /// Returns the current node.
842    pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
843        self,
844        mut f: F,
845    ) -> Result<Transformed<Self>> {
846        self.map_expressions(|expr| {
847            expr.transform_down(|expr| match expr {
848                Expr::Exists(Exists { subquery, negated }) => {
849                    f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
850                        LogicalPlan::Subquery(subquery) => {
851                            Ok(Expr::Exists(Exists { subquery, negated }))
852                        }
853                        _ => internal_err!("Transformation should return Subquery"),
854                    })
855                }
856                Expr::InSubquery(InSubquery {
857                    expr,
858                    subquery,
859                    negated,
860                }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
861                    LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
862                        expr,
863                        subquery,
864                        negated,
865                    })),
866                    _ => internal_err!("Transformation should return Subquery"),
867                }),
868                Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
869                    .map_data(|s| match s {
870                        LogicalPlan::Subquery(subquery) => {
871                            Ok(Expr::ScalarSubquery(subquery))
872                        }
873                        _ => internal_err!("Transformation should return Subquery"),
874                    }),
875                _ => Ok(Transformed::no(expr)),
876            })
877        })
878    }
879}