datafusion_expr/logical_plan/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logical plan types
19
20use std::cmp::Ordering;
21use std::collections::{BTreeMap, HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::dml::CopyTo;
27use super::invariants::{
28    assert_always_invariants_at_current_node, assert_executable_invariants,
29    InvariantLevel,
30};
31use super::DdlStatement;
32use crate::builder::{change_redundant_column, unnest_with_options};
33use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams};
34use crate::expr_rewriter::{
35    create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
36};
37use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
38use crate::logical_plan::extension::UserDefinedLogicalNode;
39use crate::logical_plan::{DmlStatement, Statement};
40use crate::utils::{
41    enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan,
42    find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist,
43    split_conjunction,
44};
45use crate::{
46    build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
47    Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
48    TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
49};
50
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::cse::{NormalizeEq, Normalizeable};
53use datafusion_common::tree_node::{
54    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
55};
56use datafusion_common::{
57    aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
58    DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
59    FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference,
60    UnnestOptions,
61};
62use indexmap::IndexSet;
63
64// backwards compatibility
65use crate::display::PgJsonVisitor;
66pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
67pub use datafusion_common::{JoinConstraint, JoinType};
68
69/// A `LogicalPlan` is a node in a tree of relational operators (such as
70/// Projection or Filter).
71///
72/// Represents transforming an input relation (table) to an output relation
73/// (table) with a potentially different schema. Plans form a dataflow tree
74/// where data flows from leaves up to the root to produce the query result.
75///
76/// `LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
77/// or programmatically (for example custom query languages).
78///
79/// # See also:
80/// * [`Expr`]: For the expressions that are evaluated by the plan
81/// * [`LogicalPlanBuilder`]: For building `LogicalPlan`s
82/// * [`tree_node`]: To inspect and rewrite `LogicalPlan`s
83///
84/// [`tree_node`]: crate::logical_plan::tree_node
85///
86/// # Examples
87///
88/// ## Creating a LogicalPlan from SQL:
89///
90/// See [`SessionContext::sql`](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql)
91///
92/// ## Creating a LogicalPlan from the DataFrame API:
93///
94/// See [`DataFrame::logical_plan`](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.logical_plan)
95///
96/// ## Creating a LogicalPlan programmatically:
97///
98/// See [`LogicalPlanBuilder`]
99///
100/// # Visiting and Rewriting `LogicalPlan`s
101///
102/// Using the [`tree_node`] API, you can recursively walk all nodes in a
103/// `LogicalPlan`. For example, to find all column references in a plan:
104///
105/// ```
106/// # use std::collections::HashSet;
107/// # use arrow::datatypes::{DataType, Field, Schema};
108/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
109/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
110/// # use datafusion_common::{Column, Result};
111/// # fn employee_schema() -> Schema {
112/// #    Schema::new(vec![
113/// #           Field::new("name", DataType::Utf8, false),
114/// #           Field::new("salary", DataType::Int32, false),
115/// #       ])
116/// #   }
117/// // Projection(name, salary)
118/// //   Filter(salary > 1000)
119/// //     TableScan(employee)
120/// # fn main() -> Result<()> {
121/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
122///  .filter(col("salary").gt(lit(1000)))?
123///  .project(vec![col("name")])?
124///  .build()?;
125///
126/// // use apply to walk the plan and collect all expressions
127/// let mut expressions = HashSet::new();
128/// plan.apply(|node| {
129///   // collect all expressions in the plan
130///   node.apply_expressions(|expr| {
131///    expressions.insert(expr.clone());
132///    Ok(TreeNodeRecursion::Continue) // control walk of expressions
133///   })?;
134///   Ok(TreeNodeRecursion::Continue) // control walk of plan nodes
135/// }).unwrap();
136///
137/// // we found the expression in projection and filter
138/// assert_eq!(expressions.len(), 2);
139/// println!("Found expressions: {:?}", expressions);
140/// // found predicate in the Filter: employee.salary > 1000
141/// let salary = Expr::Column(Column::new(Some("employee"), "salary"));
142/// assert!(expressions.contains(&salary.gt(lit(1000))));
143/// // found projection in the Projection: employee.name
144/// let name = Expr::Column(Column::new(Some("employee"), "name"));
145/// assert!(expressions.contains(&name));
146/// # Ok(())
147/// # }
148/// ```
149///
150/// You can also rewrite plans using the [`tree_node`] API. For example, to
151/// replace the filter predicate in a plan:
152///
153/// ```
154/// # use std::collections::HashSet;
155/// # use arrow::datatypes::{DataType, Field, Schema};
156/// # use datafusion_expr::{Expr, col, lit, LogicalPlan, LogicalPlanBuilder, table_scan};
157/// # use datafusion_common::tree_node::{TreeNodeRecursion, TreeNode};
158/// # use datafusion_common::{Column, Result};
159/// # fn employee_schema() -> Schema {
160/// #    Schema::new(vec![
161/// #           Field::new("name", DataType::Utf8, false),
162/// #           Field::new("salary", DataType::Int32, false),
163/// #       ])
164/// #   }
165/// // Projection(name, salary)
166/// //   Filter(salary > 1000)
167/// //     TableScan(employee)
168/// # fn main() -> Result<()> {
169/// use datafusion_common::tree_node::Transformed;
170/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
171///  .filter(col("salary").gt(lit(1000)))?
172///  .project(vec![col("name")])?
173///  .build()?;
174///
175/// // use transform to rewrite the plan
176/// let transformed_result = plan.transform(|node| {
177///   // when we see the filter node
178///   if let LogicalPlan::Filter(mut filter) = node {
179///     // replace predicate with salary < 2000
180///     filter.predicate = Expr::Column(Column::new(Some("employee"), "salary")).lt(lit(2000));
181///     let new_plan = LogicalPlan::Filter(filter);
182///     return Ok(Transformed::yes(new_plan)); // communicate the node was changed
183///   }
184///   // return the node unchanged
185///   Ok(Transformed::no(node))
186/// }).unwrap();
187///
188/// // Transformed result contains rewritten plan and information about
189/// // whether the plan was changed
190/// assert!(transformed_result.transformed);
191/// let rewritten_plan = transformed_result.data;
192///
193/// // we found the filter
194/// assert_eq!(rewritten_plan.display_indent().to_string(),
195/// "Projection: employee.name\
196/// \n  Filter: employee.salary < Int32(2000)\
197/// \n    TableScan: employee");
198/// # Ok(())
199/// # }
200/// ```
201///
202#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
203pub enum LogicalPlan {
204    /// Evaluates an arbitrary list of expressions (essentially a
205    /// SELECT with an expression list) on its input.
206    Projection(Projection),
207    /// Filters rows from its input that do not match an
208    /// expression (essentially a WHERE clause with a predicate
209    /// expression).
210    ///
211    /// Semantically, `<predicate>` is evaluated for each row of the
212    /// input; If the value of `<predicate>` is true, the input row is
213    /// passed to the output. If the value of `<predicate>` is false
214    /// (or null), the row is discarded.
215    Filter(Filter),
216    /// Windows input based on a set of window spec and window
217    /// function (e.g. SUM or RANK).  This is used to implement SQL
218    /// window functions, and the `OVER` clause.
219    ///
220    /// See [`Window`] for more details
221    Window(Window),
222    /// Aggregates its input based on a set of grouping and aggregate
223    /// expressions (e.g. SUM). This is used to implement SQL aggregates
224    /// and `GROUP BY`.
225    ///
226    /// See [`Aggregate`] for more details
227    Aggregate(Aggregate),
228    /// Sorts its input according to a list of sort expressions. This
229    /// is used to implement SQL `ORDER BY`
230    Sort(Sort),
231    /// Join two logical plans on one or more join columns.
232    /// This is used to implement SQL `JOIN`
233    Join(Join),
234    /// Repartitions the input based on a partitioning scheme. This is
235    /// used to add parallelism and is sometimes referred to as an
236    /// "exchange" operator in other systems
237    Repartition(Repartition),
238    /// Union multiple inputs with the same schema into a single
239    /// output stream. This is used to implement SQL `UNION [ALL]` and
240    /// `INTERSECT [ALL]`.
241    Union(Union),
242    /// Produces rows from a [`TableSource`], used to implement SQL
243    /// `FROM` tables or views.
244    TableScan(TableScan),
245    /// Produces no rows: An empty relation with an empty schema that
246    /// produces 0 or 1 row. This is used to implement SQL `SELECT`
247    /// that has no values in the `FROM` clause.
248    EmptyRelation(EmptyRelation),
249    /// Produces the output of running another query.  This is used to
250    /// implement SQL subqueries
251    Subquery(Subquery),
252    /// Aliased relation provides, or changes, the name of a relation.
253    SubqueryAlias(SubqueryAlias),
254    /// Skip some number of rows, and then fetch some number of rows.
255    Limit(Limit),
256    /// A DataFusion [`Statement`] such as `SET VARIABLE` or `START TRANSACTION`
257    Statement(Statement),
258    /// Values expression. See
259    /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
260    /// documentation for more details. This is used to implement SQL such as
261    /// `VALUES (1, 2), (3, 4)`
262    Values(Values),
263    /// Produces a relation with string representations of
264    /// various parts of the plan. This is used to implement SQL `EXPLAIN`.
265    Explain(Explain),
266    /// Runs the input, and prints annotated physical plan as a string
267    /// with execution metric. This is used to implement SQL
268    /// `EXPLAIN ANALYZE`.
269    Analyze(Analyze),
270    /// Extension operator defined outside of DataFusion. This is used
271    /// to extend DataFusion with custom relational operations that
272    Extension(Extension),
273    /// Remove duplicate rows from the input. This is used to
274    /// implement SQL `SELECT DISTINCT ...`.
275    Distinct(Distinct),
276    /// Data Manipulation Language (DML): Insert / Update / Delete
277    Dml(DmlStatement),
278    /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
279    Ddl(DdlStatement),
280    /// `COPY TO` for writing plan results to files
281    Copy(CopyTo),
282    /// Describe the schema of the table. This is used to implement the
283    /// SQL `DESCRIBE` command from MySQL.
284    DescribeTable(DescribeTable),
285    /// Unnest a column that contains a nested list type such as an
286    /// ARRAY. This is used to implement SQL `UNNEST`
287    Unnest(Unnest),
288    /// A variadic query (e.g. "Recursive CTEs")
289    RecursiveQuery(RecursiveQuery),
290}
291
292impl Default for LogicalPlan {
293    fn default() -> Self {
294        LogicalPlan::EmptyRelation(EmptyRelation {
295            produce_one_row: false,
296            schema: Arc::new(DFSchema::empty()),
297        })
298    }
299}
300
301impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
302    fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
303        &'a self,
304        mut f: F,
305    ) -> Result<TreeNodeRecursion> {
306        f(self)
307    }
308
309    fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
310        self,
311        mut f: F,
312    ) -> Result<Transformed<Self>> {
313        f(self)
314    }
315}
316
317impl LogicalPlan {
318    /// Get a reference to the logical plan's schema
319    pub fn schema(&self) -> &DFSchemaRef {
320        match self {
321            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
322            LogicalPlan::Values(Values { schema, .. }) => schema,
323            LogicalPlan::TableScan(TableScan {
324                projected_schema, ..
325            }) => projected_schema,
326            LogicalPlan::Projection(Projection { schema, .. }) => schema,
327            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
328            LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
329            LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
330            LogicalPlan::Window(Window { schema, .. }) => schema,
331            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
332            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
333            LogicalPlan::Join(Join { schema, .. }) => schema,
334            LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
335            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
336            LogicalPlan::Statement(statement) => statement.schema(),
337            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
338            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
339            LogicalPlan::Explain(explain) => &explain.schema,
340            LogicalPlan::Analyze(analyze) => &analyze.schema,
341            LogicalPlan::Extension(extension) => extension.node.schema(),
342            LogicalPlan::Union(Union { schema, .. }) => schema,
343            LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
344                output_schema
345            }
346            LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
347            LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
348            LogicalPlan::Ddl(ddl) => ddl.schema(),
349            LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
350            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
351                // we take the schema of the static term as the schema of the entire recursive query
352                static_term.schema()
353            }
354        }
355    }
356
357    /// Used for normalizing columns, as the fallback schemas to the main schema
358    /// of the plan.
359    pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
360        match self {
361            LogicalPlan::Window(_)
362            | LogicalPlan::Projection(_)
363            | LogicalPlan::Aggregate(_)
364            | LogicalPlan::Unnest(_)
365            | LogicalPlan::Join(_) => self
366                .inputs()
367                .iter()
368                .map(|input| input.schema().as_ref())
369                .collect(),
370            _ => vec![],
371        }
372    }
373
374    /// Returns the (fixed) output schema for explain plans
375    pub fn explain_schema() -> SchemaRef {
376        SchemaRef::new(Schema::new(vec![
377            Field::new("plan_type", DataType::Utf8, false),
378            Field::new("plan", DataType::Utf8, false),
379        ]))
380    }
381
382    /// Returns the (fixed) output schema for `DESCRIBE` plans
383    pub fn describe_schema() -> Schema {
384        Schema::new(vec![
385            Field::new("column_name", DataType::Utf8, false),
386            Field::new("data_type", DataType::Utf8, false),
387            Field::new("is_nullable", DataType::Utf8, false),
388        ])
389    }
390
391    /// Returns all expressions (non-recursively) evaluated by the current
392    /// logical plan node. This does not include expressions in any children.
393    ///
394    /// Note this method `clone`s all the expressions. When possible, the
395    /// [`tree_node`] API should be used instead of this API.
396    ///
397    /// The returned expressions do not necessarily represent or even
398    /// contributed to the output schema of this node. For example,
399    /// `LogicalPlan::Filter` returns the filter expression even though the
400    /// output of a Filter has the same columns as the input.
401    ///
402    /// The expressions do contain all the columns that are used by this plan,
403    /// so if there are columns not referenced by these expressions then
404    /// DataFusion's optimizer attempts to optimize them away.
405    ///
406    /// [`tree_node`]: crate::logical_plan::tree_node
407    pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
408        let mut exprs = vec![];
409        self.apply_expressions(|e| {
410            exprs.push(e.clone());
411            Ok(TreeNodeRecursion::Continue)
412        })
413        // closure always returns OK
414        .unwrap();
415        exprs
416    }
417
418    /// Returns all the out reference(correlated) expressions (recursively) in the current
419    /// logical plan nodes and all its descendant nodes.
420    pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
421        let mut exprs = vec![];
422        self.apply_expressions(|e| {
423            find_out_reference_exprs(e).into_iter().for_each(|e| {
424                if !exprs.contains(&e) {
425                    exprs.push(e)
426                }
427            });
428            Ok(TreeNodeRecursion::Continue)
429        })
430        // closure always returns OK
431        .unwrap();
432        self.inputs()
433            .into_iter()
434            .flat_map(|child| child.all_out_ref_exprs())
435            .for_each(|e| {
436                if !exprs.contains(&e) {
437                    exprs.push(e)
438                }
439            });
440        exprs
441    }
442
443    /// Returns all inputs / children of this `LogicalPlan` node.
444    ///
445    /// Note does not include inputs to inputs, or subqueries.
446    pub fn inputs(&self) -> Vec<&LogicalPlan> {
447        match self {
448            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
449            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
450            LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
451            LogicalPlan::Window(Window { input, .. }) => vec![input],
452            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
453            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
454            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
455            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
456            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
457            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
458            LogicalPlan::Extension(extension) => extension.node.inputs(),
459            LogicalPlan::Union(Union { inputs, .. }) => {
460                inputs.iter().map(|arc| arc.as_ref()).collect()
461            }
462            LogicalPlan::Distinct(
463                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
464            ) => vec![input],
465            LogicalPlan::Explain(explain) => vec![&explain.plan],
466            LogicalPlan::Analyze(analyze) => vec![&analyze.input],
467            LogicalPlan::Dml(write) => vec![&write.input],
468            LogicalPlan::Copy(copy) => vec![&copy.input],
469            LogicalPlan::Ddl(ddl) => ddl.inputs(),
470            LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
471            LogicalPlan::RecursiveQuery(RecursiveQuery {
472                static_term,
473                recursive_term,
474                ..
475            }) => vec![static_term, recursive_term],
476            LogicalPlan::Statement(stmt) => stmt.inputs(),
477            // plans without inputs
478            LogicalPlan::TableScan { .. }
479            | LogicalPlan::EmptyRelation { .. }
480            | LogicalPlan::Values { .. }
481            | LogicalPlan::DescribeTable(_) => vec![],
482        }
483    }
484
485    /// returns all `Using` join columns in a logical plan
486    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
487        let mut using_columns: Vec<HashSet<Column>> = vec![];
488
489        self.apply_with_subqueries(|plan| {
490            if let LogicalPlan::Join(Join {
491                join_constraint: JoinConstraint::Using,
492                on,
493                ..
494            }) = plan
495            {
496                // The join keys in using-join must be columns.
497                let columns =
498                    on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
499                        let Some(l) = l.get_as_join_column() else {
500                            return internal_err!(
501                                "Invalid join key. Expected column, found {l:?}"
502                            );
503                        };
504                        let Some(r) = r.get_as_join_column() else {
505                            return internal_err!(
506                                "Invalid join key. Expected column, found {r:?}"
507                            );
508                        };
509                        accumu.insert(l.to_owned());
510                        accumu.insert(r.to_owned());
511                        Result::<_, DataFusionError>::Ok(accumu)
512                    })?;
513                using_columns.push(columns);
514            }
515            Ok(TreeNodeRecursion::Continue)
516        })?;
517
518        Ok(using_columns)
519    }
520
521    /// returns the first output expression of this `LogicalPlan` node.
522    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
523        match self {
524            LogicalPlan::Projection(projection) => {
525                Ok(Some(projection.expr.as_slice()[0].clone()))
526            }
527            LogicalPlan::Aggregate(agg) => {
528                if agg.group_expr.is_empty() {
529                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
530                } else {
531                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
532                }
533            }
534            LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
535                Ok(Some(select_expr[0].clone()))
536            }
537            LogicalPlan::Filter(Filter { input, .. })
538            | LogicalPlan::Distinct(Distinct::All(input))
539            | LogicalPlan::Sort(Sort { input, .. })
540            | LogicalPlan::Limit(Limit { input, .. })
541            | LogicalPlan::Repartition(Repartition { input, .. })
542            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
543            LogicalPlan::Join(Join {
544                left,
545                right,
546                join_type,
547                ..
548            }) => match join_type {
549                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
550                    if left.schema().fields().is_empty() {
551                        right.head_output_expr()
552                    } else {
553                        left.head_output_expr()
554                    }
555                }
556                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
557                    left.head_output_expr()
558                }
559                JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
560            },
561            LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
562                static_term.head_output_expr()
563            }
564            LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
565                union.schema.qualified_field(0),
566            )))),
567            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
568                table.projected_schema.qualified_field(0),
569            )))),
570            LogicalPlan::SubqueryAlias(subquery_alias) => {
571                let expr_opt = subquery_alias.input.head_output_expr()?;
572                expr_opt
573                    .map(|expr| {
574                        Ok(Expr::Column(create_col_from_scalar_expr(
575                            &expr,
576                            subquery_alias.alias.to_string(),
577                        )?))
578                    })
579                    .map_or(Ok(None), |v| v.map(Some))
580            }
581            LogicalPlan::Subquery(_) => Ok(None),
582            LogicalPlan::EmptyRelation(_)
583            | LogicalPlan::Statement(_)
584            | LogicalPlan::Values(_)
585            | LogicalPlan::Explain(_)
586            | LogicalPlan::Analyze(_)
587            | LogicalPlan::Extension(_)
588            | LogicalPlan::Dml(_)
589            | LogicalPlan::Copy(_)
590            | LogicalPlan::Ddl(_)
591            | LogicalPlan::DescribeTable(_)
592            | LogicalPlan::Unnest(_) => Ok(None),
593        }
594    }
595
596    /// Recomputes schema and type information for this LogicalPlan if needed.
597    ///
598    /// Some `LogicalPlan`s may need to recompute their schema if the number or
599    /// type of expressions have been changed (for example due to type
600    /// coercion). For example [`LogicalPlan::Projection`]s schema depends on
601    /// its expressions.
602    ///
603    /// Some `LogicalPlan`s schema is unaffected by any changes to their
604    /// expressions. For example [`LogicalPlan::Filter`] schema is always the
605    /// same as its input schema.
606    ///
607    /// This is useful after modifying a plans `Expr`s (or input plans) via
608    /// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
609    /// [Self::with_new_exprs], this method does not require a new set of
610    /// expressions or inputs plans.
611    ///
612    /// # Return value
613    /// Returns an error if there is some issue recomputing the schema.
614    ///
615    /// # Notes
616    ///
617    /// * Does not recursively recompute schema for input (child) plans.
618    pub fn recompute_schema(self) -> Result<Self> {
619        match self {
620            // Since expr may be different than the previous expr, schema of the projection
621            // may change. We need to use try_new method instead of try_new_with_schema method.
622            LogicalPlan::Projection(Projection {
623                expr,
624                input,
625                schema: _,
626            }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
627            LogicalPlan::Dml(_) => Ok(self),
628            LogicalPlan::Copy(_) => Ok(self),
629            LogicalPlan::Values(Values { schema, values }) => {
630                // todo it isn't clear why the schema is not recomputed here
631                Ok(LogicalPlan::Values(Values { schema, values }))
632            }
633            LogicalPlan::Filter(Filter {
634                predicate,
635                input,
636                having,
637            }) => Filter::try_new_internal(predicate, input, having)
638                .map(LogicalPlan::Filter),
639            LogicalPlan::Repartition(_) => Ok(self),
640            LogicalPlan::Window(Window {
641                input,
642                window_expr,
643                schema: _,
644            }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
645            LogicalPlan::Aggregate(Aggregate {
646                input,
647                group_expr,
648                aggr_expr,
649                schema: _,
650            }) => Aggregate::try_new(input, group_expr, aggr_expr)
651                .map(LogicalPlan::Aggregate),
652            LogicalPlan::Sort(_) => Ok(self),
653            LogicalPlan::Join(Join {
654                left,
655                right,
656                filter,
657                join_type,
658                join_constraint,
659                on,
660                schema: _,
661                null_equals_null,
662            }) => {
663                let schema =
664                    build_join_schema(left.schema(), right.schema(), &join_type)?;
665
666                let new_on: Vec<_> = on
667                    .into_iter()
668                    .map(|equi_expr| {
669                        // SimplifyExpression rule may add alias to the equi_expr.
670                        (equi_expr.0.unalias(), equi_expr.1.unalias())
671                    })
672                    .collect();
673
674                Ok(LogicalPlan::Join(Join {
675                    left,
676                    right,
677                    join_type,
678                    join_constraint,
679                    on: new_on,
680                    filter,
681                    schema: DFSchemaRef::new(schema),
682                    null_equals_null,
683                }))
684            }
685            LogicalPlan::Subquery(_) => Ok(self),
686            LogicalPlan::SubqueryAlias(SubqueryAlias {
687                input,
688                alias,
689                schema: _,
690            }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
691            LogicalPlan::Limit(_) => Ok(self),
692            LogicalPlan::Ddl(_) => Ok(self),
693            LogicalPlan::Extension(Extension { node }) => {
694                // todo make an API that does not require cloning
695                // This requires a copy of the extension nodes expressions and inputs
696                let expr = node.expressions();
697                let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
698                Ok(LogicalPlan::Extension(Extension {
699                    node: node.with_exprs_and_inputs(expr, inputs)?,
700                }))
701            }
702            LogicalPlan::Union(Union { inputs, schema }) => {
703                let first_input_schema = inputs[0].schema();
704                if schema.fields().len() == first_input_schema.fields().len() {
705                    // If inputs are not pruned do not change schema
706                    Ok(LogicalPlan::Union(Union { inputs, schema }))
707                } else {
708                    // A note on `Union`s constructed via `try_new_by_name`:
709                    //
710                    // At this point, the schema for each input should have
711                    // the same width. Thus, we do not need to save whether a
712                    // `Union` was created `BY NAME`, and can safely rely on the
713                    // `try_new` initializer to derive the new schema based on
714                    // column positions.
715                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
716                }
717            }
718            LogicalPlan::Distinct(distinct) => {
719                let distinct = match distinct {
720                    Distinct::All(input) => Distinct::All(input),
721                    Distinct::On(DistinctOn {
722                        on_expr,
723                        select_expr,
724                        sort_expr,
725                        input,
726                        schema: _,
727                    }) => Distinct::On(DistinctOn::try_new(
728                        on_expr,
729                        select_expr,
730                        sort_expr,
731                        input,
732                    )?),
733                };
734                Ok(LogicalPlan::Distinct(distinct))
735            }
736            LogicalPlan::RecursiveQuery(_) => Ok(self),
737            LogicalPlan::Analyze(_) => Ok(self),
738            LogicalPlan::Explain(_) => Ok(self),
739            LogicalPlan::TableScan(_) => Ok(self),
740            LogicalPlan::EmptyRelation(_) => Ok(self),
741            LogicalPlan::Statement(_) => Ok(self),
742            LogicalPlan::DescribeTable(_) => Ok(self),
743            LogicalPlan::Unnest(Unnest {
744                input,
745                exec_columns,
746                options,
747                ..
748            }) => {
749                // Update schema with unnested column type.
750                unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
751            }
752        }
753    }
754
755    /// Returns a new `LogicalPlan` based on `self` with inputs and
756    /// expressions replaced.
757    ///
758    /// Note this method creates an entirely new node, which requires a large
759    /// amount of clone'ing. When possible, the [`tree_node`] API should be used
760    /// instead of this API.
761    ///
762    /// The exprs correspond to the same order of expressions returned
763    /// by [`Self::expressions`]. This function is used by optimizers
764    /// to rewrite plans using the following pattern:
765    ///
766    /// [`tree_node`]: crate::logical_plan::tree_node
767    ///
768    /// ```text
769    /// let new_inputs = optimize_children(..., plan, props);
770    ///
771    /// // get the plans expressions to optimize
772    /// let exprs = plan.expressions();
773    ///
774    /// // potentially rewrite plan expressions
775    /// let rewritten_exprs = rewrite_exprs(exprs);
776    ///
777    /// // create new plan using rewritten_exprs in same position
778    /// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
779    /// ```
780    pub fn with_new_exprs(
781        &self,
782        mut expr: Vec<Expr>,
783        inputs: Vec<LogicalPlan>,
784    ) -> Result<LogicalPlan> {
785        match self {
786            // Since expr may be different than the previous expr, schema of the projection
787            // may change. We need to use try_new method instead of try_new_with_schema method.
788            LogicalPlan::Projection(Projection { .. }) => {
789                let input = self.only_input(inputs)?;
790                Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
791            }
792            LogicalPlan::Dml(DmlStatement {
793                table_name,
794                target,
795                op,
796                ..
797            }) => {
798                self.assert_no_expressions(expr)?;
799                let input = self.only_input(inputs)?;
800                Ok(LogicalPlan::Dml(DmlStatement::new(
801                    table_name.clone(),
802                    Arc::clone(target),
803                    op.clone(),
804                    Arc::new(input),
805                )))
806            }
807            LogicalPlan::Copy(CopyTo {
808                input: _,
809                output_url,
810                file_type,
811                options,
812                partition_by,
813            }) => {
814                self.assert_no_expressions(expr)?;
815                let input = self.only_input(inputs)?;
816                Ok(LogicalPlan::Copy(CopyTo {
817                    input: Arc::new(input),
818                    output_url: output_url.clone(),
819                    file_type: Arc::clone(file_type),
820                    options: options.clone(),
821                    partition_by: partition_by.clone(),
822                }))
823            }
824            LogicalPlan::Values(Values { schema, .. }) => {
825                self.assert_no_inputs(inputs)?;
826                Ok(LogicalPlan::Values(Values {
827                    schema: Arc::clone(schema),
828                    values: expr
829                        .chunks_exact(schema.fields().len())
830                        .map(|s| s.to_vec())
831                        .collect(),
832                }))
833            }
834            LogicalPlan::Filter { .. } => {
835                let predicate = self.only_expr(expr)?;
836                let input = self.only_input(inputs)?;
837
838                Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
839            }
840            LogicalPlan::Repartition(Repartition {
841                partitioning_scheme,
842                ..
843            }) => match partitioning_scheme {
844                Partitioning::RoundRobinBatch(n) => {
845                    self.assert_no_expressions(expr)?;
846                    let input = self.only_input(inputs)?;
847                    Ok(LogicalPlan::Repartition(Repartition {
848                        partitioning_scheme: Partitioning::RoundRobinBatch(*n),
849                        input: Arc::new(input),
850                    }))
851                }
852                Partitioning::Hash(_, n) => {
853                    let input = self.only_input(inputs)?;
854                    Ok(LogicalPlan::Repartition(Repartition {
855                        partitioning_scheme: Partitioning::Hash(expr, *n),
856                        input: Arc::new(input),
857                    }))
858                }
859                Partitioning::DistributeBy(_) => {
860                    let input = self.only_input(inputs)?;
861                    Ok(LogicalPlan::Repartition(Repartition {
862                        partitioning_scheme: Partitioning::DistributeBy(expr),
863                        input: Arc::new(input),
864                    }))
865                }
866            },
867            LogicalPlan::Window(Window { window_expr, .. }) => {
868                assert_eq!(window_expr.len(), expr.len());
869                let input = self.only_input(inputs)?;
870                Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
871            }
872            LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
873                let input = self.only_input(inputs)?;
874                // group exprs are the first expressions
875                let agg_expr = expr.split_off(group_expr.len());
876
877                Aggregate::try_new(Arc::new(input), expr, agg_expr)
878                    .map(LogicalPlan::Aggregate)
879            }
880            LogicalPlan::Sort(Sort {
881                expr: sort_expr,
882                fetch,
883                ..
884            }) => {
885                let input = self.only_input(inputs)?;
886                Ok(LogicalPlan::Sort(Sort {
887                    expr: expr
888                        .into_iter()
889                        .zip(sort_expr.iter())
890                        .map(|(expr, sort)| sort.with_expr(expr))
891                        .collect(),
892                    input: Arc::new(input),
893                    fetch: *fetch,
894                }))
895            }
896            LogicalPlan::Join(Join {
897                join_type,
898                join_constraint,
899                on,
900                null_equals_null,
901                ..
902            }) => {
903                let (left, right) = self.only_two_inputs(inputs)?;
904                let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
905
906                let equi_expr_count = on.len();
907                assert!(expr.len() >= equi_expr_count);
908
909                // Assume that the last expr, if any,
910                // is the filter_expr (non equality predicate from ON clause)
911                let filter_expr = if expr.len() > equi_expr_count {
912                    expr.pop()
913                } else {
914                    None
915                };
916
917                // The first part of expr is equi-exprs,
918                // and the struct of each equi-expr is like `left-expr = right-expr`.
919                assert_eq!(expr.len(), equi_expr_count);
920                let new_on = expr.into_iter().map(|equi_expr| {
921                    // SimplifyExpression rule may add alias to the equi_expr.
922                    let unalias_expr = equi_expr.clone().unalias();
923                    if let Expr::BinaryExpr(BinaryExpr { left, op: Operator::Eq, right }) = unalias_expr {
924                        Ok((*left, *right))
925                    } else {
926                        internal_err!(
927                            "The front part expressions should be an binary equality expression, actual:{equi_expr}"
928                        )
929                    }
930                }).collect::<Result<Vec<(Expr, Expr)>>>()?;
931
932                Ok(LogicalPlan::Join(Join {
933                    left: Arc::new(left),
934                    right: Arc::new(right),
935                    join_type: *join_type,
936                    join_constraint: *join_constraint,
937                    on: new_on,
938                    filter: filter_expr,
939                    schema: DFSchemaRef::new(schema),
940                    null_equals_null: *null_equals_null,
941                }))
942            }
943            LogicalPlan::Subquery(Subquery {
944                outer_ref_columns, ..
945            }) => {
946                self.assert_no_expressions(expr)?;
947                let input = self.only_input(inputs)?;
948                let subquery = LogicalPlanBuilder::from(input).build()?;
949                Ok(LogicalPlan::Subquery(Subquery {
950                    subquery: Arc::new(subquery),
951                    outer_ref_columns: outer_ref_columns.clone(),
952                }))
953            }
954            LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
955                self.assert_no_expressions(expr)?;
956                let input = self.only_input(inputs)?;
957                SubqueryAlias::try_new(Arc::new(input), alias.clone())
958                    .map(LogicalPlan::SubqueryAlias)
959            }
960            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
961                let old_expr_len = skip.iter().chain(fetch.iter()).count();
962                if old_expr_len != expr.len() {
963                    return internal_err!(
964                        "Invalid number of new Limit expressions: expected {}, got {}",
965                        old_expr_len,
966                        expr.len()
967                    );
968                }
969                // `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
970                let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
971                let new_skip = skip.as_ref().and_then(|_| expr.pop());
972                let input = self.only_input(inputs)?;
973                Ok(LogicalPlan::Limit(Limit {
974                    skip: new_skip.map(Box::new),
975                    fetch: new_fetch.map(Box::new),
976                    input: Arc::new(input),
977                }))
978            }
979            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
980                name,
981                if_not_exists,
982                or_replace,
983                column_defaults,
984                temporary,
985                ..
986            })) => {
987                self.assert_no_expressions(expr)?;
988                let input = self.only_input(inputs)?;
989                Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
990                    CreateMemoryTable {
991                        input: Arc::new(input),
992                        constraints: Constraints::empty(),
993                        name: name.clone(),
994                        if_not_exists: *if_not_exists,
995                        or_replace: *or_replace,
996                        column_defaults: column_defaults.clone(),
997                        temporary: *temporary,
998                    },
999                )))
1000            }
1001            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1002                name,
1003                or_replace,
1004                definition,
1005                temporary,
1006                ..
1007            })) => {
1008                self.assert_no_expressions(expr)?;
1009                let input = self.only_input(inputs)?;
1010                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1011                    input: Arc::new(input),
1012                    name: name.clone(),
1013                    or_replace: *or_replace,
1014                    temporary: *temporary,
1015                    definition: definition.clone(),
1016                })))
1017            }
1018            LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1019                node: e.node.with_exprs_and_inputs(expr, inputs)?,
1020            })),
1021            LogicalPlan::Union(Union { schema, .. }) => {
1022                self.assert_no_expressions(expr)?;
1023                let input_schema = inputs[0].schema();
1024                // If inputs are not pruned do not change schema.
1025                let schema = if schema.fields().len() == input_schema.fields().len() {
1026                    Arc::clone(schema)
1027                } else {
1028                    Arc::clone(input_schema)
1029                };
1030                Ok(LogicalPlan::Union(Union {
1031                    inputs: inputs.into_iter().map(Arc::new).collect(),
1032                    schema,
1033                }))
1034            }
1035            LogicalPlan::Distinct(distinct) => {
1036                let distinct = match distinct {
1037                    Distinct::All(_) => {
1038                        self.assert_no_expressions(expr)?;
1039                        let input = self.only_input(inputs)?;
1040                        Distinct::All(Arc::new(input))
1041                    }
1042                    Distinct::On(DistinctOn {
1043                        on_expr,
1044                        select_expr,
1045                        ..
1046                    }) => {
1047                        let input = self.only_input(inputs)?;
1048                        let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1049                        let select_expr = expr.split_off(on_expr.len());
1050                        assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1051                        Distinct::On(DistinctOn::try_new(
1052                            expr,
1053                            select_expr,
1054                            None, // no sort expressions accepted
1055                            Arc::new(input),
1056                        )?)
1057                    }
1058                };
1059                Ok(LogicalPlan::Distinct(distinct))
1060            }
1061            LogicalPlan::RecursiveQuery(RecursiveQuery {
1062                name, is_distinct, ..
1063            }) => {
1064                self.assert_no_expressions(expr)?;
1065                let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1066                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1067                    name: name.clone(),
1068                    static_term: Arc::new(static_term),
1069                    recursive_term: Arc::new(recursive_term),
1070                    is_distinct: *is_distinct,
1071                }))
1072            }
1073            LogicalPlan::Analyze(a) => {
1074                self.assert_no_expressions(expr)?;
1075                let input = self.only_input(inputs)?;
1076                Ok(LogicalPlan::Analyze(Analyze {
1077                    verbose: a.verbose,
1078                    schema: Arc::clone(&a.schema),
1079                    input: Arc::new(input),
1080                }))
1081            }
1082            LogicalPlan::Explain(e) => {
1083                self.assert_no_expressions(expr)?;
1084                let input = self.only_input(inputs)?;
1085                Ok(LogicalPlan::Explain(Explain {
1086                    verbose: e.verbose,
1087                    plan: Arc::new(input),
1088                    stringified_plans: e.stringified_plans.clone(),
1089                    schema: Arc::clone(&e.schema),
1090                    logical_optimization_succeeded: e.logical_optimization_succeeded,
1091                }))
1092            }
1093            LogicalPlan::Statement(Statement::Prepare(Prepare {
1094                name,
1095                data_types,
1096                ..
1097            })) => {
1098                self.assert_no_expressions(expr)?;
1099                let input = self.only_input(inputs)?;
1100                Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1101                    name: name.clone(),
1102                    data_types: data_types.clone(),
1103                    input: Arc::new(input),
1104                })))
1105            }
1106            LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1107                self.assert_no_inputs(inputs)?;
1108                Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1109                    name: name.clone(),
1110                    parameters: expr,
1111                })))
1112            }
1113            LogicalPlan::TableScan(ts) => {
1114                self.assert_no_inputs(inputs)?;
1115                Ok(LogicalPlan::TableScan(TableScan {
1116                    filters: expr,
1117                    ..ts.clone()
1118                }))
1119            }
1120            LogicalPlan::EmptyRelation(_)
1121            | LogicalPlan::Ddl(_)
1122            | LogicalPlan::Statement(_)
1123            | LogicalPlan::DescribeTable(_) => {
1124                // All of these plan types have no inputs / exprs so should not be called
1125                self.assert_no_expressions(expr)?;
1126                self.assert_no_inputs(inputs)?;
1127                Ok(self.clone())
1128            }
1129            LogicalPlan::Unnest(Unnest {
1130                exec_columns: columns,
1131                options,
1132                ..
1133            }) => {
1134                self.assert_no_expressions(expr)?;
1135                let input = self.only_input(inputs)?;
1136                // Update schema with unnested column type.
1137                let new_plan =
1138                    unnest_with_options(input, columns.clone(), options.clone())?;
1139                Ok(new_plan)
1140            }
1141        }
1142    }
1143
1144    /// checks that the plan conforms to the listed invariant level, returning an Error if not
1145    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1146        match check {
1147            InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1148            InvariantLevel::Executable => assert_executable_invariants(self),
1149        }
1150    }
1151
1152    /// Helper for [Self::with_new_exprs] to use when no expressions are expected.
1153    #[inline]
1154    #[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
1155    fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1156        if !expr.is_empty() {
1157            return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1158        }
1159        Ok(())
1160    }
1161
1162    /// Helper for [Self::with_new_exprs] to use when no inputs are expected.
1163    #[inline]
1164    #[allow(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
1165    fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1166        if !inputs.is_empty() {
1167            return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1168        }
1169        Ok(())
1170    }
1171
1172    /// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
1173    #[inline]
1174    fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1175        if expr.len() != 1 {
1176            return internal_err!(
1177                "{self:?} should have exactly one expr, got {:?}",
1178                expr
1179            );
1180        }
1181        Ok(expr.remove(0))
1182    }
1183
1184    /// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
1185    #[inline]
1186    fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1187        if inputs.len() != 1 {
1188            return internal_err!(
1189                "{self:?} should have exactly one input, got {:?}",
1190                inputs
1191            );
1192        }
1193        Ok(inputs.remove(0))
1194    }
1195
1196    /// Helper for [Self::with_new_exprs] to use when exactly two inputs are expected.
1197    #[inline]
1198    fn only_two_inputs(
1199        &self,
1200        mut inputs: Vec<LogicalPlan>,
1201    ) -> Result<(LogicalPlan, LogicalPlan)> {
1202        if inputs.len() != 2 {
1203            return internal_err!(
1204                "{self:?} should have exactly two inputs, got {:?}",
1205                inputs
1206            );
1207        }
1208        let right = inputs.remove(1);
1209        let left = inputs.remove(0);
1210        Ok((left, right))
1211    }
1212
1213    /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
1214    /// with the specified `param_values`.
1215    ///
1216    /// [`Prepare`] statements are converted to
1217    /// their inner logical plan for execution.
1218    ///
1219    /// # Example
1220    /// ```
1221    /// # use arrow::datatypes::{Field, Schema, DataType};
1222    /// use datafusion_common::ScalarValue;
1223    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder};
1224    /// # let schema = Schema::new(vec![
1225    /// #     Field::new("id", DataType::Int32, false),
1226    /// # ]);
1227    /// // Build SELECT * FROM t1 WHERE id = $1
1228    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1229    ///     .filter(col("id").eq(placeholder("$1"))).unwrap()
1230    ///     .build().unwrap();
1231    ///
1232    /// assert_eq!(
1233    ///   "Filter: t1.id = $1\
1234    ///   \n  TableScan: t1",
1235    ///   plan.display_indent().to_string()
1236    /// );
1237    ///
1238    /// // Fill in the parameter $1 with a literal 3
1239    /// let plan = plan.with_param_values(vec![
1240    ///   ScalarValue::from(3i32) // value at index 0 --> $1
1241    /// ]).unwrap();
1242    ///
1243    /// assert_eq!(
1244    ///    "Filter: t1.id = Int32(3)\
1245    ///    \n  TableScan: t1",
1246    ///    plan.display_indent().to_string()
1247    ///  );
1248    ///
1249    /// // Note you can also used named parameters
1250    /// // Build SELECT * FROM t1 WHERE id = $my_param
1251    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1252    ///     .filter(col("id").eq(placeholder("$my_param"))).unwrap()
1253    ///     .build().unwrap()
1254    ///     // Fill in the parameter $my_param with a literal 3
1255    ///     .with_param_values(vec![
1256    ///       ("my_param", ScalarValue::from(3i32)),
1257    ///     ]).unwrap();
1258    ///
1259    /// assert_eq!(
1260    ///    "Filter: t1.id = Int32(3)\
1261    ///    \n  TableScan: t1",
1262    ///    plan.display_indent().to_string()
1263    ///  );
1264    ///
1265    /// ```
1266    pub fn with_param_values(
1267        self,
1268        param_values: impl Into<ParamValues>,
1269    ) -> Result<LogicalPlan> {
1270        let param_values = param_values.into();
1271        let plan_with_values = self.replace_params_with_values(&param_values)?;
1272
1273        // unwrap Prepare
1274        Ok(
1275            if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1276                plan_with_values
1277            {
1278                param_values.verify(&prepare_lp.data_types)?;
1279                // try and take ownership of the input if is not shared, clone otherwise
1280                Arc::unwrap_or_clone(prepare_lp.input)
1281            } else {
1282                plan_with_values
1283            },
1284        )
1285    }
1286
1287    /// Returns the maximum number of rows that this plan can output, if known.
1288    ///
1289    /// If `None`, the plan can return any number of rows.
1290    /// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
1291    pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1292        match self {
1293            LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1294            LogicalPlan::Filter(filter) => {
1295                if filter.is_scalar() {
1296                    Some(1)
1297                } else {
1298                    filter.input.max_rows()
1299                }
1300            }
1301            LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1302            LogicalPlan::Aggregate(Aggregate {
1303                input, group_expr, ..
1304            }) => {
1305                // Empty group_expr will return Some(1)
1306                if group_expr
1307                    .iter()
1308                    .all(|expr| matches!(expr, Expr::Literal(_)))
1309                {
1310                    Some(1)
1311                } else {
1312                    input.max_rows()
1313                }
1314            }
1315            LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1316                match (fetch, input.max_rows()) {
1317                    (Some(fetch_limit), Some(input_max)) => {
1318                        Some(input_max.min(*fetch_limit))
1319                    }
1320                    (Some(fetch_limit), None) => Some(*fetch_limit),
1321                    (None, Some(input_max)) => Some(input_max),
1322                    (None, None) => None,
1323                }
1324            }
1325            LogicalPlan::Join(Join {
1326                left,
1327                right,
1328                join_type,
1329                ..
1330            }) => match join_type {
1331                JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1332                JoinType::Left | JoinType::Right | JoinType::Full => {
1333                    match (left.max_rows()?, right.max_rows()?, join_type) {
1334                        (0, 0, _) => Some(0),
1335                        (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1336                        (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1337                        (left_max, right_max, _) => Some(left_max * right_max),
1338                    }
1339                }
1340                JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1341                    left.max_rows()
1342                }
1343                JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
1344            },
1345            LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1346            LogicalPlan::Union(Union { inputs, .. }) => {
1347                inputs.iter().try_fold(0usize, |mut acc, plan| {
1348                    acc += plan.max_rows()?;
1349                    Some(acc)
1350                })
1351            }
1352            LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1353            LogicalPlan::EmptyRelation(_) => Some(0),
1354            LogicalPlan::RecursiveQuery(_) => None,
1355            LogicalPlan::Subquery(_) => None,
1356            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1357            LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1358                Ok(FetchType::Literal(s)) => s,
1359                _ => None,
1360            },
1361            LogicalPlan::Distinct(
1362                Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1363            ) => input.max_rows(),
1364            LogicalPlan::Values(v) => Some(v.values.len()),
1365            LogicalPlan::Unnest(_) => None,
1366            LogicalPlan::Ddl(_)
1367            | LogicalPlan::Explain(_)
1368            | LogicalPlan::Analyze(_)
1369            | LogicalPlan::Dml(_)
1370            | LogicalPlan::Copy(_)
1371            | LogicalPlan::DescribeTable(_)
1372            | LogicalPlan::Statement(_)
1373            | LogicalPlan::Extension(_) => None,
1374        }
1375    }
1376
1377    /// If this node's expressions contains any references to an outer subquery
1378    pub fn contains_outer_reference(&self) -> bool {
1379        let mut contains = false;
1380        self.apply_expressions(|expr| {
1381            Ok(if expr.contains_outer() {
1382                contains = true;
1383                TreeNodeRecursion::Stop
1384            } else {
1385                TreeNodeRecursion::Continue
1386            })
1387        })
1388        .unwrap();
1389        contains
1390    }
1391
1392    /// Get the output expressions and their corresponding columns.
1393    ///
1394    /// The parent node may reference the output columns of the plan by expressions, such as
1395    /// projection over aggregate or window functions. This method helps to convert the
1396    /// referenced expressions into columns.
1397    ///
1398    /// See also: [`crate::utils::columnize_expr`]
1399    pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1400        match self {
1401            LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1402                .output_expressions()?
1403                .into_iter()
1404                .zip(self.schema().columns())
1405                .collect()),
1406            LogicalPlan::Window(Window {
1407                window_expr,
1408                input,
1409                schema,
1410            }) => {
1411                // The input could be another Window, so the result should also include the input's. For Example:
1412                // `EXPLAIN SELECT RANK() OVER (PARTITION BY a ORDER BY b), SUM(b) OVER (PARTITION BY a) FROM t`
1413                // Its plan is:
1414                // Projection: RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(t.b) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
1415                //   WindowAggr: windowExpr=[[SUM(CAST(t.b AS Int64)) PARTITION BY [t.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
1416                //     WindowAggr: windowExpr=[[RANK() PARTITION BY [t.a] ORDER BY [t.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]/
1417                //       TableScan: t projection=[a, b]
1418                let mut output_exprs = input.columnized_output_exprs()?;
1419                let input_len = input.schema().fields().len();
1420                output_exprs.extend(
1421                    window_expr
1422                        .iter()
1423                        .zip(schema.columns().into_iter().skip(input_len)),
1424                );
1425                Ok(output_exprs)
1426            }
1427            _ => Ok(vec![]),
1428        }
1429    }
1430}
1431
1432impl LogicalPlan {
1433    /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
1434    /// ...) replaced with corresponding values provided in
1435    /// `params_values`
1436    ///
1437    /// See [`Self::with_param_values`] for examples and usage with an owned
1438    /// `ParamValues`
1439    pub fn replace_params_with_values(
1440        self,
1441        param_values: &ParamValues,
1442    ) -> Result<LogicalPlan> {
1443        self.transform_up_with_subqueries(|plan| {
1444            let schema = Arc::clone(plan.schema());
1445            let name_preserver = NamePreserver::new(&plan);
1446            plan.map_expressions(|e| {
1447                let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1448                if !has_placeholder {
1449                    // Performance optimization:
1450                    // avoid NamePreserver copy and second pass over expression
1451                    // if no placeholders.
1452                    Ok(Transformed::no(e))
1453                } else {
1454                    let original_name = name_preserver.save(&e);
1455                    let transformed_expr = e.transform_up(|e| {
1456                        if let Expr::Placeholder(Placeholder { id, .. }) = e {
1457                            let value = param_values.get_placeholders_with_values(&id)?;
1458                            Ok(Transformed::yes(Expr::Literal(value)))
1459                        } else {
1460                            Ok(Transformed::no(e))
1461                        }
1462                    })?;
1463                    // Preserve name to avoid breaking column references to this expression
1464                    Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1465                }
1466            })
1467        })
1468        .map(|res| res.data)
1469    }
1470
1471    /// Walk the logical plan, find any `Placeholder` tokens, and return a set of their names.
1472    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1473        let mut param_names = HashSet::new();
1474        self.apply_with_subqueries(|plan| {
1475            plan.apply_expressions(|expr| {
1476                expr.apply(|expr| {
1477                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1478                        param_names.insert(id.clone());
1479                    }
1480                    Ok(TreeNodeRecursion::Continue)
1481                })
1482            })
1483        })
1484        .map(|_| param_names)
1485    }
1486
1487    /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
1488    pub fn get_parameter_types(
1489        &self,
1490    ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1491        let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1492
1493        self.apply_with_subqueries(|plan| {
1494            plan.apply_expressions(|expr| {
1495                expr.apply(|expr| {
1496                    if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1497                        let prev = param_types.get(id);
1498                        match (prev, data_type) {
1499                            (Some(Some(prev)), Some(dt)) => {
1500                                if prev != dt {
1501                                    plan_err!("Conflicting types for {id}")?;
1502                                }
1503                            }
1504                            (_, Some(dt)) => {
1505                                param_types.insert(id.clone(), Some(dt.clone()));
1506                            }
1507                            _ => {
1508                                param_types.insert(id.clone(), None);
1509                            }
1510                        }
1511                    }
1512                    Ok(TreeNodeRecursion::Continue)
1513                })
1514            })
1515        })
1516        .map(|_| param_types)
1517    }
1518
1519    // ------------
1520    // Various implementations for printing out LogicalPlans
1521    // ------------
1522
1523    /// Return a `format`able structure that produces a single line
1524    /// per node.
1525    ///
1526    /// # Example
1527    ///
1528    /// ```text
1529    /// Projection: employee.id
1530    ///    Filter: employee.state Eq Utf8(\"CO\")\
1531    ///       CsvScan: employee projection=Some([0, 3])
1532    /// ```
1533    ///
1534    /// ```
1535    /// use arrow::datatypes::{Field, Schema, DataType};
1536    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1537    /// let schema = Schema::new(vec![
1538    ///     Field::new("id", DataType::Int32, false),
1539    /// ]);
1540    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1541    ///     .filter(col("id").eq(lit(5))).unwrap()
1542    ///     .build().unwrap();
1543    ///
1544    /// // Format using display_indent
1545    /// let display_string = format!("{}", plan.display_indent());
1546    ///
1547    /// assert_eq!("Filter: t1.id = Int32(5)\n  TableScan: t1",
1548    ///             display_string);
1549    /// ```
1550    pub fn display_indent(&self) -> impl Display + '_ {
1551        // Boilerplate structure to wrap LogicalPlan with something
1552        // that that can be formatted
1553        struct Wrapper<'a>(&'a LogicalPlan);
1554        impl Display for Wrapper<'_> {
1555            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1556                let with_schema = false;
1557                let mut visitor = IndentVisitor::new(f, with_schema);
1558                match self.0.visit_with_subqueries(&mut visitor) {
1559                    Ok(_) => Ok(()),
1560                    Err(_) => Err(fmt::Error),
1561                }
1562            }
1563        }
1564        Wrapper(self)
1565    }
1566
1567    /// Return a `format`able structure that produces a single line
1568    /// per node that includes the output schema. For example:
1569    ///
1570    /// ```text
1571    /// Projection: employee.id [id:Int32]\
1572    ///    Filter: employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
1573    ///      TableScan: employee projection=[0, 3] [id:Int32, state:Utf8]";
1574    /// ```
1575    ///
1576    /// ```
1577    /// use arrow::datatypes::{Field, Schema, DataType};
1578    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1579    /// let schema = Schema::new(vec![
1580    ///     Field::new("id", DataType::Int32, false),
1581    /// ]);
1582    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1583    ///     .filter(col("id").eq(lit(5))).unwrap()
1584    ///     .build().unwrap();
1585    ///
1586    /// // Format using display_indent_schema
1587    /// let display_string = format!("{}", plan.display_indent_schema());
1588    ///
1589    /// assert_eq!("Filter: t1.id = Int32(5) [id:Int32]\
1590    ///             \n  TableScan: t1 [id:Int32]",
1591    ///             display_string);
1592    /// ```
1593    pub fn display_indent_schema(&self) -> impl Display + '_ {
1594        // Boilerplate structure to wrap LogicalPlan with something
1595        // that that can be formatted
1596        struct Wrapper<'a>(&'a LogicalPlan);
1597        impl Display for Wrapper<'_> {
1598            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1599                let with_schema = true;
1600                let mut visitor = IndentVisitor::new(f, with_schema);
1601                match self.0.visit_with_subqueries(&mut visitor) {
1602                    Ok(_) => Ok(()),
1603                    Err(_) => Err(fmt::Error),
1604                }
1605            }
1606        }
1607        Wrapper(self)
1608    }
1609
1610    /// Return a displayable structure that produces plan in postgresql JSON format.
1611    ///
1612    /// Users can use this format to visualize the plan in existing plan visualization tools, for example [dalibo](https://explain.dalibo.com/)
1613    pub fn display_pg_json(&self) -> impl Display + '_ {
1614        // Boilerplate structure to wrap LogicalPlan with something
1615        // that that can be formatted
1616        struct Wrapper<'a>(&'a LogicalPlan);
1617        impl Display for Wrapper<'_> {
1618            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1619                let mut visitor = PgJsonVisitor::new(f);
1620                visitor.with_schema(true);
1621                match self.0.visit_with_subqueries(&mut visitor) {
1622                    Ok(_) => Ok(()),
1623                    Err(_) => Err(fmt::Error),
1624                }
1625            }
1626        }
1627        Wrapper(self)
1628    }
1629
1630    /// Return a `format`able structure that produces lines meant for
1631    /// graphical display using the `DOT` language. This format can be
1632    /// visualized using software from
1633    /// [`graphviz`](https://graphviz.org/)
1634    ///
1635    /// This currently produces two graphs -- one with the basic
1636    /// structure, and one with additional details such as schema.
1637    ///
1638    /// ```
1639    /// use arrow::datatypes::{Field, Schema, DataType};
1640    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1641    /// let schema = Schema::new(vec![
1642    ///     Field::new("id", DataType::Int32, false),
1643    /// ]);
1644    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1645    ///     .filter(col("id").eq(lit(5))).unwrap()
1646    ///     .build().unwrap();
1647    ///
1648    /// // Format using display_graphviz
1649    /// let graphviz_string = format!("{}", plan.display_graphviz());
1650    /// ```
1651    ///
1652    /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
1653    /// commands can be used to render it as a pdf:
1654    ///
1655    /// ```bash
1656    ///   dot -Tpdf < /tmp/example.dot  > /tmp/example.pdf
1657    /// ```
1658    ///
1659    pub fn display_graphviz(&self) -> impl Display + '_ {
1660        // Boilerplate structure to wrap LogicalPlan with something
1661        // that that can be formatted
1662        struct Wrapper<'a>(&'a LogicalPlan);
1663        impl Display for Wrapper<'_> {
1664            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1665                let mut visitor = GraphvizVisitor::new(f);
1666
1667                visitor.start_graph()?;
1668
1669                visitor.pre_visit_plan("LogicalPlan")?;
1670                self.0
1671                    .visit_with_subqueries(&mut visitor)
1672                    .map_err(|_| fmt::Error)?;
1673                visitor.post_visit_plan()?;
1674
1675                visitor.set_with_schema(true);
1676                visitor.pre_visit_plan("Detailed LogicalPlan")?;
1677                self.0
1678                    .visit_with_subqueries(&mut visitor)
1679                    .map_err(|_| fmt::Error)?;
1680                visitor.post_visit_plan()?;
1681
1682                visitor.end_graph()?;
1683                Ok(())
1684            }
1685        }
1686        Wrapper(self)
1687    }
1688
1689    /// Return a `format`able structure with the a human readable
1690    /// description of this LogicalPlan node per node, not including
1691    /// children. For example:
1692    ///
1693    /// ```text
1694    /// Projection: id
1695    /// ```
1696    /// ```
1697    /// use arrow::datatypes::{Field, Schema, DataType};
1698    /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
1699    /// let schema = Schema::new(vec![
1700    ///     Field::new("id", DataType::Int32, false),
1701    /// ]);
1702    /// let plan = table_scan(Some("t1"), &schema, None).unwrap()
1703    ///     .build().unwrap();
1704    ///
1705    /// // Format using display
1706    /// let display_string = format!("{}", plan.display());
1707    ///
1708    /// assert_eq!("TableScan: t1", display_string);
1709    /// ```
1710    pub fn display(&self) -> impl Display + '_ {
1711        // Boilerplate structure to wrap LogicalPlan with something
1712        // that that can be formatted
1713        struct Wrapper<'a>(&'a LogicalPlan);
1714        impl Display for Wrapper<'_> {
1715            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1716                match self.0 {
1717                    LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1718                    LogicalPlan::RecursiveQuery(RecursiveQuery {
1719                        is_distinct, ..
1720                    }) => {
1721                        write!(f, "RecursiveQuery: is_distinct={}", is_distinct)
1722                    }
1723                    LogicalPlan::Values(Values { ref values, .. }) => {
1724                        let str_values: Vec<_> = values
1725                            .iter()
1726                            // limit to only 5 values to avoid horrible display
1727                            .take(5)
1728                            .map(|row| {
1729                                let item = row
1730                                    .iter()
1731                                    .map(|expr| expr.to_string())
1732                                    .collect::<Vec<_>>()
1733                                    .join(", ");
1734                                format!("({item})")
1735                            })
1736                            .collect();
1737
1738                        let eclipse = if values.len() > 5 { "..." } else { "" };
1739                        write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1740                    }
1741
1742                    LogicalPlan::TableScan(TableScan {
1743                        ref source,
1744                        ref table_name,
1745                        ref projection,
1746                        ref filters,
1747                        ref fetch,
1748                        ..
1749                    }) => {
1750                        let projected_fields = match projection {
1751                            Some(indices) => {
1752                                let schema = source.schema();
1753                                let names: Vec<&str> = indices
1754                                    .iter()
1755                                    .map(|i| schema.field(*i).name().as_str())
1756                                    .collect();
1757                                format!(" projection=[{}]", names.join(", "))
1758                            }
1759                            _ => "".to_string(),
1760                        };
1761
1762                        write!(f, "TableScan: {table_name}{projected_fields}")?;
1763
1764                        if !filters.is_empty() {
1765                            let mut full_filter = vec![];
1766                            let mut partial_filter = vec![];
1767                            let mut unsupported_filters = vec![];
1768                            let filters: Vec<&Expr> = filters.iter().collect();
1769
1770                            if let Ok(results) =
1771                                source.supports_filters_pushdown(&filters)
1772                            {
1773                                filters.iter().zip(results.iter()).for_each(
1774                                    |(x, res)| match res {
1775                                        TableProviderFilterPushDown::Exact => {
1776                                            full_filter.push(x)
1777                                        }
1778                                        TableProviderFilterPushDown::Inexact => {
1779                                            partial_filter.push(x)
1780                                        }
1781                                        TableProviderFilterPushDown::Unsupported => {
1782                                            unsupported_filters.push(x)
1783                                        }
1784                                    },
1785                                );
1786                            }
1787
1788                            if !full_filter.is_empty() {
1789                                write!(
1790                                    f,
1791                                    ", full_filters=[{}]",
1792                                    expr_vec_fmt!(full_filter)
1793                                )?;
1794                            };
1795                            if !partial_filter.is_empty() {
1796                                write!(
1797                                    f,
1798                                    ", partial_filters=[{}]",
1799                                    expr_vec_fmt!(partial_filter)
1800                                )?;
1801                            }
1802                            if !unsupported_filters.is_empty() {
1803                                write!(
1804                                    f,
1805                                    ", unsupported_filters=[{}]",
1806                                    expr_vec_fmt!(unsupported_filters)
1807                                )?;
1808                            }
1809                        }
1810
1811                        if let Some(n) = fetch {
1812                            write!(f, ", fetch={n}")?;
1813                        }
1814
1815                        Ok(())
1816                    }
1817                    LogicalPlan::Projection(Projection { ref expr, .. }) => {
1818                        write!(f, "Projection: ")?;
1819                        for (i, expr_item) in expr.iter().enumerate() {
1820                            if i > 0 {
1821                                write!(f, ", ")?;
1822                            }
1823                            write!(f, "{expr_item}")?;
1824                        }
1825                        Ok(())
1826                    }
1827                    LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1828                        write!(f, "Dml: op=[{op}] table=[{table_name}]")
1829                    }
1830                    LogicalPlan::Copy(CopyTo {
1831                        input: _,
1832                        output_url,
1833                        file_type,
1834                        options,
1835                        ..
1836                    }) => {
1837                        let op_str = options
1838                            .iter()
1839                            .map(|(k, v)| format!("{k} {v}"))
1840                            .collect::<Vec<String>>()
1841                            .join(", ");
1842
1843                        write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1844                    }
1845                    LogicalPlan::Ddl(ddl) => {
1846                        write!(f, "{}", ddl.display())
1847                    }
1848                    LogicalPlan::Filter(Filter {
1849                        predicate: ref expr,
1850                        ..
1851                    }) => write!(f, "Filter: {expr}"),
1852                    LogicalPlan::Window(Window {
1853                        ref window_expr, ..
1854                    }) => {
1855                        write!(
1856                            f,
1857                            "WindowAggr: windowExpr=[[{}]]",
1858                            expr_vec_fmt!(window_expr)
1859                        )
1860                    }
1861                    LogicalPlan::Aggregate(Aggregate {
1862                        ref group_expr,
1863                        ref aggr_expr,
1864                        ..
1865                    }) => write!(
1866                        f,
1867                        "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1868                        expr_vec_fmt!(group_expr),
1869                        expr_vec_fmt!(aggr_expr)
1870                    ),
1871                    LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1872                        write!(f, "Sort: ")?;
1873                        for (i, expr_item) in expr.iter().enumerate() {
1874                            if i > 0 {
1875                                write!(f, ", ")?;
1876                            }
1877                            write!(f, "{expr_item}")?;
1878                        }
1879                        if let Some(a) = fetch {
1880                            write!(f, ", fetch={a}")?;
1881                        }
1882
1883                        Ok(())
1884                    }
1885                    LogicalPlan::Join(Join {
1886                        on: ref keys,
1887                        filter,
1888                        join_constraint,
1889                        join_type,
1890                        ..
1891                    }) => {
1892                        let join_expr: Vec<String> =
1893                            keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1894                        let filter_expr = filter
1895                            .as_ref()
1896                            .map(|expr| format!(" Filter: {expr}"))
1897                            .unwrap_or_else(|| "".to_string());
1898                        let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1899                            "Cross".to_string()
1900                        } else {
1901                            join_type.to_string()
1902                        };
1903                        match join_constraint {
1904                            JoinConstraint::On => {
1905                                write!(
1906                                    f,
1907                                    "{} Join: {}{}",
1908                                    join_type,
1909                                    join_expr.join(", "),
1910                                    filter_expr
1911                                )
1912                            }
1913                            JoinConstraint::Using => {
1914                                write!(
1915                                    f,
1916                                    "{} Join: Using {}{}",
1917                                    join_type,
1918                                    join_expr.join(", "),
1919                                    filter_expr,
1920                                )
1921                            }
1922                        }
1923                    }
1924                    LogicalPlan::Repartition(Repartition {
1925                        partitioning_scheme,
1926                        ..
1927                    }) => match partitioning_scheme {
1928                        Partitioning::RoundRobinBatch(n) => {
1929                            write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1930                        }
1931                        Partitioning::Hash(expr, n) => {
1932                            let hash_expr: Vec<String> =
1933                                expr.iter().map(|e| format!("{e}")).collect();
1934                            write!(
1935                                f,
1936                                "Repartition: Hash({}) partition_count={}",
1937                                hash_expr.join(", "),
1938                                n
1939                            )
1940                        }
1941                        Partitioning::DistributeBy(expr) => {
1942                            let dist_by_expr: Vec<String> =
1943                                expr.iter().map(|e| format!("{e}")).collect();
1944                            write!(
1945                                f,
1946                                "Repartition: DistributeBy({})",
1947                                dist_by_expr.join(", "),
1948                            )
1949                        }
1950                    },
1951                    LogicalPlan::Limit(limit) => {
1952                        // Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions.
1953                        let skip_str = match limit.get_skip_type() {
1954                            Ok(SkipType::Literal(n)) => n.to_string(),
1955                            _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1956                        };
1957                        let fetch_str = match limit.get_fetch_type() {
1958                            Ok(FetchType::Literal(Some(n))) => n.to_string(),
1959                            Ok(FetchType::Literal(None)) => "None".to_string(),
1960                            _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1961                        };
1962                        write!(
1963                            f,
1964                            "Limit: skip={}, fetch={}", skip_str,fetch_str,
1965                        )
1966                    }
1967                    LogicalPlan::Subquery(Subquery { .. }) => {
1968                        write!(f, "Subquery:")
1969                    }
1970                    LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1971                        write!(f, "SubqueryAlias: {alias}")
1972                    }
1973                    LogicalPlan::Statement(statement) => {
1974                        write!(f, "{}", statement.display())
1975                    }
1976                    LogicalPlan::Distinct(distinct) => match distinct {
1977                        Distinct::All(_) => write!(f, "Distinct:"),
1978                        Distinct::On(DistinctOn {
1979                            on_expr,
1980                            select_expr,
1981                            sort_expr,
1982                            ..
1983                        }) => write!(
1984                            f,
1985                            "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1986                            expr_vec_fmt!(on_expr),
1987                            expr_vec_fmt!(select_expr),
1988                            if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
1989                        ),
1990                    },
1991                    LogicalPlan::Explain { .. } => write!(f, "Explain"),
1992                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
1993                    LogicalPlan::Union(_) => write!(f, "Union"),
1994                    LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
1995                    LogicalPlan::DescribeTable(DescribeTable { .. }) => {
1996                        write!(f, "DescribeTable")
1997                    }
1998                    LogicalPlan::Unnest(Unnest {
1999                        input: plan,
2000                        list_type_columns: list_col_indices,
2001                        struct_type_columns: struct_col_indices, .. }) => {
2002                        let input_columns = plan.schema().columns();
2003                        let list_type_columns = list_col_indices
2004                            .iter()
2005                            .map(|(i,unnest_info)|
2006                                format!("{}|depth={}", &input_columns[*i].to_string(),
2007                                unnest_info.depth))
2008                            .collect::<Vec<String>>();
2009                        let struct_type_columns = struct_col_indices
2010                            .iter()
2011                            .map(|i| &input_columns[*i])
2012                            .collect::<Vec<&Column>>();
2013                        // get items from input_columns indexed by list_col_indices
2014                        write!(f, "Unnest: lists[{}] structs[{}]",
2015                        expr_vec_fmt!(list_type_columns),
2016                        expr_vec_fmt!(struct_type_columns))
2017                    }
2018                }
2019            }
2020        }
2021        Wrapper(self)
2022    }
2023}
2024
2025impl Display for LogicalPlan {
2026    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2027        self.display_indent().fmt(f)
2028    }
2029}
2030
2031impl ToStringifiedPlan for LogicalPlan {
2032    fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2033        StringifiedPlan::new(plan_type, self.display_indent().to_string())
2034    }
2035}
2036
2037/// Produces no rows: An empty relation with an empty schema
2038#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2039pub struct EmptyRelation {
2040    /// Whether to produce a placeholder row
2041    pub produce_one_row: bool,
2042    /// The schema description of the output
2043    pub schema: DFSchemaRef,
2044}
2045
2046// Manual implementation needed because of `schema` field. Comparison excludes this field.
2047impl PartialOrd for EmptyRelation {
2048    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2049        self.produce_one_row.partial_cmp(&other.produce_one_row)
2050    }
2051}
2052
2053/// A variadic query operation, Recursive CTE.
2054///
2055/// # Recursive Query Evaluation
2056///
2057/// From the [Postgres Docs]:
2058///
2059/// 1. Evaluate the non-recursive term. For `UNION` (but not `UNION ALL`),
2060///    discard duplicate rows. Include all remaining rows in the result of the
2061///    recursive query, and also place them in a temporary working table.
2062///
2063/// 2. So long as the working table is not empty, repeat these steps:
2064///
2065/// * Evaluate the recursive term, substituting the current contents of the
2066///   working table for the recursive self-reference. For `UNION` (but not `UNION
2067///   ALL`), discard duplicate rows and rows that duplicate any previous result
2068///   row. Include all remaining rows in the result of the recursive query, and
2069///   also place them in a temporary intermediate table.
2070///
2071/// * Replace the contents of the working table with the contents of the
2072///   intermediate table, then empty the intermediate table.
2073///
2074/// [Postgres Docs]: https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE
2075#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2076pub struct RecursiveQuery {
2077    /// Name of the query
2078    pub name: String,
2079    /// The static term (initial contents of the working table)
2080    pub static_term: Arc<LogicalPlan>,
2081    /// The recursive term (evaluated on the contents of the working table until
2082    /// it returns an empty set)
2083    pub recursive_term: Arc<LogicalPlan>,
2084    /// Should the output of the recursive term be deduplicated (`UNION`) or
2085    /// not (`UNION ALL`).
2086    pub is_distinct: bool,
2087}
2088
2089/// Values expression. See
2090/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
2091/// documentation for more details.
2092#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2093pub struct Values {
2094    /// The table schema
2095    pub schema: DFSchemaRef,
2096    /// Values
2097    pub values: Vec<Vec<Expr>>,
2098}
2099
2100// Manual implementation needed because of `schema` field. Comparison excludes this field.
2101impl PartialOrd for Values {
2102    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2103        self.values.partial_cmp(&other.values)
2104    }
2105}
2106
2107/// Evaluates an arbitrary list of expressions (essentially a
2108/// SELECT with an expression list) on its input.
2109#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2110// mark non_exhaustive to encourage use of try_new/new()
2111#[non_exhaustive]
2112pub struct Projection {
2113    /// The list of expressions
2114    pub expr: Vec<Expr>,
2115    /// The incoming logical plan
2116    pub input: Arc<LogicalPlan>,
2117    /// The schema description of the output
2118    pub schema: DFSchemaRef,
2119}
2120
2121// Manual implementation needed because of `schema` field. Comparison excludes this field.
2122impl PartialOrd for Projection {
2123    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2124        match self.expr.partial_cmp(&other.expr) {
2125            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2126            cmp => cmp,
2127        }
2128    }
2129}
2130
2131impl Projection {
2132    /// Create a new Projection
2133    pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2134        let projection_schema = projection_schema(&input, &expr)?;
2135        Self::try_new_with_schema(expr, input, projection_schema)
2136    }
2137
2138    /// Create a new Projection using the specified output schema
2139    pub fn try_new_with_schema(
2140        expr: Vec<Expr>,
2141        input: Arc<LogicalPlan>,
2142        schema: DFSchemaRef,
2143    ) -> Result<Self> {
2144        #[expect(deprecated)]
2145        if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2146            && expr.len() != schema.fields().len()
2147        {
2148            return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2149        }
2150        Ok(Self {
2151            expr,
2152            input,
2153            schema,
2154        })
2155    }
2156
2157    /// Create a new Projection using the specified output schema
2158    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2159        let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2160        Self {
2161            expr,
2162            input,
2163            schema,
2164        }
2165    }
2166}
2167
2168/// Computes the schema of the result produced by applying a projection to the input logical plan.
2169///
2170/// # Arguments
2171///
2172/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
2173///   will be computed.
2174/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
2175///
2176/// # Returns
2177///
2178/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
2179/// produced by the projection operation. If the schema computation is successful,
2180/// the `Result` will contain the schema; otherwise, it will contain an error.
2181pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2182    let metadata = input.schema().metadata().clone();
2183
2184    let schema =
2185        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2186            .with_functional_dependencies(calc_func_dependencies_for_project(
2187                exprs, input,
2188            )?)?;
2189
2190    Ok(Arc::new(schema))
2191}
2192
2193/// Aliased subquery
2194#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2195// mark non_exhaustive to encourage use of try_new/new()
2196#[non_exhaustive]
2197pub struct SubqueryAlias {
2198    /// The incoming logical plan
2199    pub input: Arc<LogicalPlan>,
2200    /// The alias for the input relation
2201    pub alias: TableReference,
2202    /// The schema with qualified field names
2203    pub schema: DFSchemaRef,
2204}
2205
2206impl SubqueryAlias {
2207    pub fn try_new(
2208        plan: Arc<LogicalPlan>,
2209        alias: impl Into<TableReference>,
2210    ) -> Result<Self> {
2211        let alias = alias.into();
2212        let fields = change_redundant_column(plan.schema().fields());
2213        let meta_data = plan.schema().as_ref().metadata().clone();
2214        let schema: Schema =
2215            DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2216        // Since schema is the same, other than qualifier, we can use existing
2217        // functional dependencies:
2218        let func_dependencies = plan.schema().functional_dependencies().clone();
2219        let schema = DFSchemaRef::new(
2220            DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2221                .with_functional_dependencies(func_dependencies)?,
2222        );
2223        Ok(SubqueryAlias {
2224            input: plan,
2225            alias,
2226            schema,
2227        })
2228    }
2229}
2230
2231// Manual implementation needed because of `schema` field. Comparison excludes this field.
2232impl PartialOrd for SubqueryAlias {
2233    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2234        match self.input.partial_cmp(&other.input) {
2235            Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2236            cmp => cmp,
2237        }
2238    }
2239}
2240
2241/// Filters rows from its input that do not match an
2242/// expression (essentially a WHERE clause with a predicate
2243/// expression).
2244///
2245/// Semantically, `<predicate>` is evaluated for each row of the input;
2246/// If the value of `<predicate>` is true, the input row is passed to
2247/// the output. If the value of `<predicate>` is false, the row is
2248/// discarded.
2249///
2250/// Filter should not be created directly but instead use `try_new()`
2251/// and that these fields are only pub to support pattern matching
2252#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2253#[non_exhaustive]
2254pub struct Filter {
2255    /// The predicate expression, which must have Boolean type.
2256    pub predicate: Expr,
2257    /// The incoming logical plan
2258    pub input: Arc<LogicalPlan>,
2259    /// The flag to indicate if the filter is a having clause
2260    pub having: bool,
2261}
2262
2263impl Filter {
2264    /// Create a new filter operator.
2265    ///
2266    /// Notes: as Aliases have no effect on the output of a filter operator,
2267    /// they are removed from the predicate expression.
2268    pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2269        Self::try_new_internal(predicate, input, false)
2270    }
2271
2272    /// Create a new filter operator for a having clause.
2273    /// This is similar to a filter, but its having flag is set to true.
2274    pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2275        Self::try_new_internal(predicate, input, true)
2276    }
2277
2278    fn is_allowed_filter_type(data_type: &DataType) -> bool {
2279        match data_type {
2280            // Interpret NULL as a missing boolean value.
2281            DataType::Boolean | DataType::Null => true,
2282            DataType::Dictionary(_, value_type) => {
2283                Filter::is_allowed_filter_type(value_type.as_ref())
2284            }
2285            _ => false,
2286        }
2287    }
2288
2289    fn try_new_internal(
2290        predicate: Expr,
2291        input: Arc<LogicalPlan>,
2292        having: bool,
2293    ) -> Result<Self> {
2294        // Filter predicates must return a boolean value so we try and validate that here.
2295        // Note that it is not always possible to resolve the predicate expression during plan
2296        // construction (such as with correlated subqueries) so we make a best effort here and
2297        // ignore errors resolving the expression against the schema.
2298        if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2299            if !Filter::is_allowed_filter_type(&predicate_type) {
2300                return plan_err!(
2301                    "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2302                );
2303            }
2304        }
2305
2306        Ok(Self {
2307            predicate: predicate.unalias_nested().data,
2308            input,
2309            having,
2310        })
2311    }
2312
2313    /// Is this filter guaranteed to return 0 or 1 row in a given instantiation?
2314    ///
2315    /// This function will return `true` if its predicate contains a conjunction of
2316    /// `col(a) = <expr>`, where its schema has a unique filter that is covered
2317    /// by this conjunction.
2318    ///
2319    /// For example, for the table:
2320    /// ```sql
2321    /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER);
2322    /// ```
2323    /// `Filter(a = 2).is_scalar() == true`
2324    /// , whereas
2325    /// `Filter(b = 2).is_scalar() == false`
2326    /// and
2327    /// `Filter(a = 2 OR b = 2).is_scalar() == false`
2328    fn is_scalar(&self) -> bool {
2329        let schema = self.input.schema();
2330
2331        let functional_dependencies = self.input.schema().functional_dependencies();
2332        let unique_keys = functional_dependencies.iter().filter(|dep| {
2333            let nullable = dep.nullable
2334                && dep
2335                    .source_indices
2336                    .iter()
2337                    .any(|&source| schema.field(source).is_nullable());
2338            !nullable
2339                && dep.mode == Dependency::Single
2340                && dep.target_indices.len() == schema.fields().len()
2341        });
2342
2343        let exprs = split_conjunction(&self.predicate);
2344        let eq_pred_cols: HashSet<_> = exprs
2345            .iter()
2346            .filter_map(|expr| {
2347                let Expr::BinaryExpr(BinaryExpr {
2348                    left,
2349                    op: Operator::Eq,
2350                    right,
2351                }) = expr
2352                else {
2353                    return None;
2354                };
2355                // This is a no-op filter expression
2356                if left == right {
2357                    return None;
2358                }
2359
2360                match (left.as_ref(), right.as_ref()) {
2361                    (Expr::Column(_), Expr::Column(_)) => None,
2362                    (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2363                        Some(schema.index_of_column(c).unwrap())
2364                    }
2365                    _ => None,
2366                }
2367            })
2368            .collect();
2369
2370        // If we have a functional dependence that is a subset of our predicate,
2371        // this filter is scalar
2372        for key in unique_keys {
2373            if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2374                return true;
2375            }
2376        }
2377        false
2378    }
2379}
2380
2381/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
2382///
2383/// # Output Schema
2384///
2385/// The output schema is the input schema followed by the window function
2386/// expressions, in order.
2387///
2388/// For example, given the input schema `"A", "B", "C"` and the window function
2389/// `SUM(A) OVER (PARTITION BY B+1 ORDER BY C)`, the output schema will be `"A",
2390/// "B", "C", "SUM(A) OVER ..."` where `"SUM(A) OVER ..."` is the name of the
2391/// output column.
2392///
2393/// Note that the `PARTITION BY` expression "B+1" is not produced in the output
2394/// schema.
2395#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2396pub struct Window {
2397    /// The incoming logical plan
2398    pub input: Arc<LogicalPlan>,
2399    /// The window function expression
2400    pub window_expr: Vec<Expr>,
2401    /// The schema description of the window output
2402    pub schema: DFSchemaRef,
2403}
2404
2405impl Window {
2406    /// Create a new window operator.
2407    pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2408        let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2409            .schema()
2410            .iter()
2411            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2412            .collect();
2413        let input_len = fields.len();
2414        let mut window_fields = fields;
2415        let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2416        window_fields.extend_from_slice(expr_fields.as_slice());
2417        let metadata = input.schema().metadata().clone();
2418
2419        // Update functional dependencies for window:
2420        let mut window_func_dependencies =
2421            input.schema().functional_dependencies().clone();
2422        window_func_dependencies.extend_target_indices(window_fields.len());
2423
2424        // Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
2425        // of consecutive numbers per partition), we can represent this fact with
2426        // functional dependencies.
2427        let mut new_dependencies = window_expr
2428            .iter()
2429            .enumerate()
2430            .filter_map(|(idx, expr)| {
2431                if let Expr::WindowFunction(WindowFunction {
2432                    fun: WindowFunctionDefinition::WindowUDF(udwf),
2433                    params: WindowFunctionParams { partition_by, .. },
2434                }) = expr
2435                {
2436                    // When there is no PARTITION BY, row number will be unique
2437                    // across the entire table.
2438                    if udwf.name() == "row_number" && partition_by.is_empty() {
2439                        return Some(idx + input_len);
2440                    }
2441                }
2442                None
2443            })
2444            .map(|idx| {
2445                FunctionalDependence::new(vec![idx], vec![], false)
2446                    .with_mode(Dependency::Single)
2447            })
2448            .collect::<Vec<_>>();
2449
2450        if !new_dependencies.is_empty() {
2451            for dependence in new_dependencies.iter_mut() {
2452                dependence.target_indices = (0..window_fields.len()).collect();
2453            }
2454            // Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
2455            let new_deps = FunctionalDependencies::new(new_dependencies);
2456            window_func_dependencies.extend(new_deps);
2457        }
2458
2459        Self::try_new_with_schema(
2460            window_expr,
2461            input,
2462            Arc::new(
2463                DFSchema::new_with_metadata(window_fields, metadata)?
2464                    .with_functional_dependencies(window_func_dependencies)?,
2465            ),
2466        )
2467    }
2468
2469    pub fn try_new_with_schema(
2470        window_expr: Vec<Expr>,
2471        input: Arc<LogicalPlan>,
2472        schema: DFSchemaRef,
2473    ) -> Result<Self> {
2474        if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2475            return plan_err!(
2476                "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2477                window_expr.len(),
2478                schema.fields().len() - input.schema().fields().len()
2479            );
2480        }
2481
2482        Ok(Window {
2483            input,
2484            window_expr,
2485            schema,
2486        })
2487    }
2488}
2489
2490// Manual implementation needed because of `schema` field. Comparison excludes this field.
2491impl PartialOrd for Window {
2492    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2493        match self.input.partial_cmp(&other.input) {
2494            Some(Ordering::Equal) => self.window_expr.partial_cmp(&other.window_expr),
2495            cmp => cmp,
2496        }
2497    }
2498}
2499
2500/// Produces rows from a table provider by reference or from the context
2501#[derive(Clone)]
2502pub struct TableScan {
2503    /// The name of the table
2504    pub table_name: TableReference,
2505    /// The source of the table
2506    pub source: Arc<dyn TableSource>,
2507    /// Optional column indices to use as a projection
2508    pub projection: Option<Vec<usize>>,
2509    /// The schema description of the output
2510    pub projected_schema: DFSchemaRef,
2511    /// Optional expressions to be used as filters by the table provider
2512    pub filters: Vec<Expr>,
2513    /// Optional number of rows to read
2514    pub fetch: Option<usize>,
2515}
2516
2517impl Debug for TableScan {
2518    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2519        f.debug_struct("TableScan")
2520            .field("table_name", &self.table_name)
2521            .field("source", &"...")
2522            .field("projection", &self.projection)
2523            .field("projected_schema", &self.projected_schema)
2524            .field("filters", &self.filters)
2525            .field("fetch", &self.fetch)
2526            .finish_non_exhaustive()
2527    }
2528}
2529
2530impl PartialEq for TableScan {
2531    fn eq(&self, other: &Self) -> bool {
2532        self.table_name == other.table_name
2533            && self.projection == other.projection
2534            && self.projected_schema == other.projected_schema
2535            && self.filters == other.filters
2536            && self.fetch == other.fetch
2537    }
2538}
2539
2540impl Eq for TableScan {}
2541
2542// Manual implementation needed because of `source` and `projected_schema` fields.
2543// Comparison excludes these field.
2544impl PartialOrd for TableScan {
2545    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2546        #[derive(PartialEq, PartialOrd)]
2547        struct ComparableTableScan<'a> {
2548            /// The name of the table
2549            pub table_name: &'a TableReference,
2550            /// Optional column indices to use as a projection
2551            pub projection: &'a Option<Vec<usize>>,
2552            /// Optional expressions to be used as filters by the table provider
2553            pub filters: &'a Vec<Expr>,
2554            /// Optional number of rows to read
2555            pub fetch: &'a Option<usize>,
2556        }
2557        let comparable_self = ComparableTableScan {
2558            table_name: &self.table_name,
2559            projection: &self.projection,
2560            filters: &self.filters,
2561            fetch: &self.fetch,
2562        };
2563        let comparable_other = ComparableTableScan {
2564            table_name: &other.table_name,
2565            projection: &other.projection,
2566            filters: &other.filters,
2567            fetch: &other.fetch,
2568        };
2569        comparable_self.partial_cmp(&comparable_other)
2570    }
2571}
2572
2573impl Hash for TableScan {
2574    fn hash<H: Hasher>(&self, state: &mut H) {
2575        self.table_name.hash(state);
2576        self.projection.hash(state);
2577        self.projected_schema.hash(state);
2578        self.filters.hash(state);
2579        self.fetch.hash(state);
2580    }
2581}
2582
2583impl TableScan {
2584    /// Initialize TableScan with appropriate schema from the given
2585    /// arguments.
2586    pub fn try_new(
2587        table_name: impl Into<TableReference>,
2588        table_source: Arc<dyn TableSource>,
2589        projection: Option<Vec<usize>>,
2590        filters: Vec<Expr>,
2591        fetch: Option<usize>,
2592    ) -> Result<Self> {
2593        let table_name = table_name.into();
2594
2595        if table_name.table().is_empty() {
2596            return plan_err!("table_name cannot be empty");
2597        }
2598        let schema = table_source.schema();
2599        let func_dependencies = FunctionalDependencies::new_from_constraints(
2600            table_source.constraints(),
2601            schema.fields.len(),
2602        );
2603        let projected_schema = projection
2604            .as_ref()
2605            .map(|p| {
2606                let projected_func_dependencies =
2607                    func_dependencies.project_functional_dependencies(p, p.len());
2608
2609                let df_schema = DFSchema::new_with_metadata(
2610                    p.iter()
2611                        .map(|i| {
2612                            (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2613                        })
2614                        .collect(),
2615                    schema.metadata.clone(),
2616                )?;
2617                df_schema.with_functional_dependencies(projected_func_dependencies)
2618            })
2619            .unwrap_or_else(|| {
2620                let df_schema =
2621                    DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2622                df_schema.with_functional_dependencies(func_dependencies)
2623            })?;
2624        let projected_schema = Arc::new(projected_schema);
2625
2626        Ok(Self {
2627            table_name,
2628            source: table_source,
2629            projection,
2630            projected_schema,
2631            filters,
2632            fetch,
2633        })
2634    }
2635}
2636
2637// Repartition the plan based on a partitioning scheme.
2638#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2639pub struct Repartition {
2640    /// The incoming logical plan
2641    pub input: Arc<LogicalPlan>,
2642    /// The partitioning scheme
2643    pub partitioning_scheme: Partitioning,
2644}
2645
2646/// Union multiple inputs
2647#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2648pub struct Union {
2649    /// Inputs to merge
2650    pub inputs: Vec<Arc<LogicalPlan>>,
2651    /// Union schema. Should be the same for all inputs.
2652    pub schema: DFSchemaRef,
2653}
2654
2655impl Union {
2656    /// Constructs new Union instance deriving schema from inputs.
2657    fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2658        let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2659        Ok(Union { inputs, schema })
2660    }
2661
2662    /// Constructs new Union instance deriving schema from inputs.
2663    /// Inputs do not have to have matching types and produced schema will
2664    /// take type from the first input.
2665    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2666    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2667        let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2668        Ok(Union { inputs, schema })
2669    }
2670
2671    /// Constructs a new Union instance that combines rows from different tables by name,
2672    /// instead of by position. This means that the specified inputs need not have schemas
2673    /// that are all the same width.
2674    pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2675        let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2676        let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2677
2678        Ok(Union { inputs, schema })
2679    }
2680
2681    /// When constructing a `UNION BY NAME`, we may need to wrap inputs
2682    /// in an additional `Projection` to account for absence of columns
2683    /// in input schemas.
2684    fn rewrite_inputs_from_schema(
2685        schema: &DFSchema,
2686        inputs: Vec<Arc<LogicalPlan>>,
2687    ) -> Result<Vec<Arc<LogicalPlan>>> {
2688        let schema_width = schema.iter().count();
2689        let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2690        for input in inputs {
2691            // If the input plan's schema contains the same number of fields
2692            // as the derived schema, then it does not to be wrapped in an
2693            // additional `Projection`.
2694            if input.schema().iter().count() == schema_width {
2695                wrapped_inputs.push(input);
2696                continue;
2697            }
2698
2699            // Any columns that exist within the derived schema but do not exist
2700            // within an input's schema should be replaced with `NULL` aliased
2701            // to the appropriate column in the derived schema.
2702            let mut expr = Vec::with_capacity(schema_width);
2703            for column in schema.columns() {
2704                if input
2705                    .schema()
2706                    .has_column_with_unqualified_name(column.name())
2707                {
2708                    expr.push(Expr::Column(column));
2709                } else {
2710                    expr.push(Expr::Literal(ScalarValue::Null).alias(column.name()));
2711                }
2712            }
2713            wrapped_inputs.push(Arc::new(LogicalPlan::Projection(Projection::try_new(
2714                expr, input,
2715            )?)));
2716        }
2717
2718        Ok(wrapped_inputs)
2719    }
2720
2721    /// Constructs new Union instance deriving schema from inputs.
2722    ///
2723    /// If `loose_types` is true, inputs do not need to have matching types and
2724    /// the produced schema will use the type from the first input.
2725    /// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
2726    ///
2727    /// If `by_name` is `true`, input schemas need not be the same width. That is,
2728    /// the constructed schema follows `UNION BY NAME` semantics.
2729    fn derive_schema_from_inputs(
2730        inputs: &[Arc<LogicalPlan>],
2731        loose_types: bool,
2732        by_name: bool,
2733    ) -> Result<DFSchemaRef> {
2734        if inputs.len() < 2 {
2735            return plan_err!("UNION requires at least two inputs");
2736        }
2737
2738        if by_name {
2739            Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2740        } else {
2741            Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2742        }
2743    }
2744
2745    fn derive_schema_from_inputs_by_name(
2746        inputs: &[Arc<LogicalPlan>],
2747        loose_types: bool,
2748    ) -> Result<DFSchemaRef> {
2749        type FieldData<'a> = (&'a DataType, bool, Vec<&'a HashMap<String, String>>);
2750        // Prefer `BTreeMap` as it produces items in order by key when iterated over
2751        let mut cols: BTreeMap<&str, FieldData> = BTreeMap::new();
2752        for input in inputs.iter() {
2753            for field in input.schema().fields() {
2754                match cols.entry(field.name()) {
2755                    std::collections::btree_map::Entry::Occupied(mut occupied) => {
2756                        let (data_type, is_nullable, metadata) = occupied.get_mut();
2757                        if !loose_types && *data_type != field.data_type() {
2758                            return plan_err!(
2759                                "Found different types for field {}",
2760                                field.name()
2761                            );
2762                        }
2763
2764                        metadata.push(field.metadata());
2765                        // If the field is nullable in any one of the inputs,
2766                        // then the field in the final schema is also nullable.
2767                        *is_nullable |= field.is_nullable();
2768                    }
2769                    std::collections::btree_map::Entry::Vacant(vacant) => {
2770                        vacant.insert((
2771                            field.data_type(),
2772                            field.is_nullable(),
2773                            vec![field.metadata()],
2774                        ));
2775                    }
2776                }
2777            }
2778        }
2779
2780        let union_fields = cols
2781            .into_iter()
2782            .map(|(name, (data_type, is_nullable, unmerged_metadata))| {
2783                let mut field = Field::new(name, data_type.clone(), is_nullable);
2784                field.set_metadata(intersect_maps(unmerged_metadata));
2785
2786                (None, Arc::new(field))
2787            })
2788            .collect::<Vec<(Option<TableReference>, _)>>();
2789
2790        let union_schema_metadata =
2791            intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2792
2793        // Functional Dependencies are not preserved after UNION operation
2794        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2795        let schema = Arc::new(schema);
2796
2797        Ok(schema)
2798    }
2799
2800    fn derive_schema_from_inputs_by_position(
2801        inputs: &[Arc<LogicalPlan>],
2802        loose_types: bool,
2803    ) -> Result<DFSchemaRef> {
2804        let first_schema = inputs[0].schema();
2805        let fields_count = first_schema.fields().len();
2806        for input in inputs.iter().skip(1) {
2807            if fields_count != input.schema().fields().len() {
2808                return plan_err!(
2809                    "UNION queries have different number of columns: \
2810                    left has {} columns whereas right has {} columns",
2811                    fields_count,
2812                    input.schema().fields().len()
2813                );
2814            }
2815        }
2816
2817        let union_fields = (0..fields_count)
2818            .map(|i| {
2819                let fields = inputs
2820                    .iter()
2821                    .map(|input| input.schema().field(i))
2822                    .collect::<Vec<_>>();
2823                let first_field = fields[0];
2824                let name = first_field.name();
2825                let data_type = if loose_types {
2826                    // TODO apply type coercion here, or document why it's better to defer
2827                    // temporarily use the data type from the left input and later rely on the analyzer to
2828                    // coerce the two schemas into a common one.
2829                    first_field.data_type()
2830                } else {
2831                    fields.iter().skip(1).try_fold(
2832                        first_field.data_type(),
2833                        |acc, field| {
2834                            if acc != field.data_type() {
2835                                return plan_err!(
2836                                    "UNION field {i} have different type in inputs: \
2837                                    left has {} whereas right has {}",
2838                                    first_field.data_type(),
2839                                    field.data_type()
2840                                );
2841                            }
2842                            Ok(acc)
2843                        },
2844                    )?
2845                };
2846                let nullable = fields.iter().any(|field| field.is_nullable());
2847                let mut field = Field::new(name, data_type.clone(), nullable);
2848                let field_metadata =
2849                    intersect_maps(fields.iter().map(|field| field.metadata()));
2850                field.set_metadata(field_metadata);
2851                // TODO reusing table reference from the first schema is probably wrong
2852                let table_reference = first_schema.qualified_field(i).0.cloned();
2853                Ok((table_reference, Arc::new(field)))
2854            })
2855            .collect::<Result<_>>()?;
2856        let union_schema_metadata =
2857            intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2858
2859        // Functional Dependencies are not preserved after UNION operation
2860        let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2861        let schema = Arc::new(schema);
2862
2863        Ok(schema)
2864    }
2865}
2866
2867fn intersect_maps<'a>(
2868    inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2869) -> HashMap<String, String> {
2870    let mut inputs = inputs.into_iter();
2871    let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2872    for input in inputs {
2873        // The extra dereference below (`&*v`) is a workaround for https://github.com/rkyv/rkyv/issues/434.
2874        // When this crate is used in a workspace that enables the `rkyv-64` feature in the `chrono` crate,
2875        // this triggers a Rust compilation error:
2876        // error[E0277]: can't compare `Option<&std::string::String>` with `Option<&mut std::string::String>`.
2877        merged.retain(|k, v| input.get(k) == Some(&*v));
2878    }
2879    merged
2880}
2881
2882// Manual implementation needed because of `schema` field. Comparison excludes this field.
2883impl PartialOrd for Union {
2884    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2885        self.inputs.partial_cmp(&other.inputs)
2886    }
2887}
2888
2889/// Describe the schema of table
2890///
2891/// # Example output:
2892///
2893/// ```sql
2894/// > describe traces;
2895/// +--------------------+-----------------------------+-------------+
2896/// | column_name        | data_type                   | is_nullable |
2897/// +--------------------+-----------------------------+-------------+
2898/// | attributes         | Utf8                        | YES         |
2899/// | duration_nano      | Int64                       | YES         |
2900/// | end_time_unix_nano | Int64                       | YES         |
2901/// | service.name       | Dictionary(Int32, Utf8)     | YES         |
2902/// | span.kind          | Utf8                        | YES         |
2903/// | span.name          | Utf8                        | YES         |
2904/// | span_id            | Dictionary(Int32, Utf8)     | YES         |
2905/// | time               | Timestamp(Nanosecond, None) | NO          |
2906/// | trace_id           | Dictionary(Int32, Utf8)     | YES         |
2907/// | otel.status_code   | Utf8                        | YES         |
2908/// | parent_span_id     | Utf8                        | YES         |
2909/// +--------------------+-----------------------------+-------------+
2910/// ```
2911#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2912pub struct DescribeTable {
2913    /// Table schema
2914    pub schema: Arc<Schema>,
2915    /// schema of describe table output
2916    pub output_schema: DFSchemaRef,
2917}
2918
2919// Manual implementation of `PartialOrd`, returning none since there are no comparable types in
2920// `DescribeTable`. This allows `LogicalPlan` to derive `PartialOrd`.
2921impl PartialOrd for DescribeTable {
2922    fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
2923        // There is no relevant comparison for schemas
2924        None
2925    }
2926}
2927
2928/// Produces a relation with string representations of
2929/// various parts of the plan
2930#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2931pub struct Explain {
2932    /// Should extra (detailed, intermediate plans) be included?
2933    pub verbose: bool,
2934    /// The logical plan that is being EXPLAIN'd
2935    pub plan: Arc<LogicalPlan>,
2936    /// Represent the various stages plans have gone through
2937    pub stringified_plans: Vec<StringifiedPlan>,
2938    /// The output schema of the explain (2 columns of text)
2939    pub schema: DFSchemaRef,
2940    /// Used by physical planner to check if should proceed with planning
2941    pub logical_optimization_succeeded: bool,
2942}
2943
2944// Manual implementation needed because of `schema` field. Comparison excludes this field.
2945impl PartialOrd for Explain {
2946    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2947        #[derive(PartialEq, PartialOrd)]
2948        struct ComparableExplain<'a> {
2949            /// Should extra (detailed, intermediate plans) be included?
2950            pub verbose: &'a bool,
2951            /// The logical plan that is being EXPLAIN'd
2952            pub plan: &'a Arc<LogicalPlan>,
2953            /// Represent the various stages plans have gone through
2954            pub stringified_plans: &'a Vec<StringifiedPlan>,
2955            /// Used by physical planner to check if should proceed with planning
2956            pub logical_optimization_succeeded: &'a bool,
2957        }
2958        let comparable_self = ComparableExplain {
2959            verbose: &self.verbose,
2960            plan: &self.plan,
2961            stringified_plans: &self.stringified_plans,
2962            logical_optimization_succeeded: &self.logical_optimization_succeeded,
2963        };
2964        let comparable_other = ComparableExplain {
2965            verbose: &other.verbose,
2966            plan: &other.plan,
2967            stringified_plans: &other.stringified_plans,
2968            logical_optimization_succeeded: &other.logical_optimization_succeeded,
2969        };
2970        comparable_self.partial_cmp(&comparable_other)
2971    }
2972}
2973
2974/// Runs the actual plan, and then prints the physical plan with
2975/// with execution metrics.
2976#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2977pub struct Analyze {
2978    /// Should extra detail be included?
2979    pub verbose: bool,
2980    /// The logical plan that is being EXPLAIN ANALYZE'd
2981    pub input: Arc<LogicalPlan>,
2982    /// The output schema of the explain (2 columns of text)
2983    pub schema: DFSchemaRef,
2984}
2985
2986// Manual implementation needed because of `schema` field. Comparison excludes this field.
2987impl PartialOrd for Analyze {
2988    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2989        match self.verbose.partial_cmp(&other.verbose) {
2990            Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2991            cmp => cmp,
2992        }
2993    }
2994}
2995
2996/// Extension operator defined outside of DataFusion
2997// TODO(clippy): This clippy `allow` should be removed if
2998// the manual `PartialEq` is removed in favor of a derive.
2999// (see `PartialEq` the impl for details.)
3000#[allow(clippy::derived_hash_with_manual_eq)]
3001#[derive(Debug, Clone, Eq, Hash)]
3002pub struct Extension {
3003    /// The runtime extension operator
3004    pub node: Arc<dyn UserDefinedLogicalNode>,
3005}
3006
3007// `PartialEq` cannot be derived for types containing `Arc<dyn Trait>`.
3008// This manual implementation should be removed if
3009// https://github.com/rust-lang/rust/issues/39128 is fixed.
3010impl PartialEq for Extension {
3011    fn eq(&self, other: &Self) -> bool {
3012        self.node.eq(&other.node)
3013    }
3014}
3015
3016impl PartialOrd for Extension {
3017    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3018        self.node.partial_cmp(&other.node)
3019    }
3020}
3021
3022/// Produces the first `n` tuples from its input and discards the rest.
3023#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3024pub struct Limit {
3025    /// Number of rows to skip before fetch
3026    pub skip: Option<Box<Expr>>,
3027    /// Maximum number of rows to fetch,
3028    /// None means fetching all rows
3029    pub fetch: Option<Box<Expr>>,
3030    /// The logical plan
3031    pub input: Arc<LogicalPlan>,
3032}
3033
3034/// Different types of skip expression in Limit plan.
3035pub enum SkipType {
3036    /// The skip expression is a literal value.
3037    Literal(usize),
3038    /// Currently only supports expressions that can be folded into constants.
3039    UnsupportedExpr,
3040}
3041
3042/// Different types of fetch expression in Limit plan.
3043pub enum FetchType {
3044    /// The fetch expression is a literal value.
3045    /// `Literal(None)` means the fetch expression is not provided.
3046    Literal(Option<usize>),
3047    /// Currently only supports expressions that can be folded into constants.
3048    UnsupportedExpr,
3049}
3050
3051impl Limit {
3052    /// Get the skip type from the limit plan.
3053    pub fn get_skip_type(&self) -> Result<SkipType> {
3054        match self.skip.as_deref() {
3055            Some(expr) => match *expr {
3056                Expr::Literal(ScalarValue::Int64(s)) => {
3057                    // `skip = NULL` is equivalent to `skip = 0`
3058                    let s = s.unwrap_or(0);
3059                    if s >= 0 {
3060                        Ok(SkipType::Literal(s as usize))
3061                    } else {
3062                        plan_err!("OFFSET must be >=0, '{}' was provided", s)
3063                    }
3064                }
3065                _ => Ok(SkipType::UnsupportedExpr),
3066            },
3067            // `skip = None` is equivalent to `skip = 0`
3068            None => Ok(SkipType::Literal(0)),
3069        }
3070    }
3071
3072    /// Get the fetch type from the limit plan.
3073    pub fn get_fetch_type(&self) -> Result<FetchType> {
3074        match self.fetch.as_deref() {
3075            Some(expr) => match *expr {
3076                Expr::Literal(ScalarValue::Int64(Some(s))) => {
3077                    if s >= 0 {
3078                        Ok(FetchType::Literal(Some(s as usize)))
3079                    } else {
3080                        plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3081                    }
3082                }
3083                Expr::Literal(ScalarValue::Int64(None)) => Ok(FetchType::Literal(None)),
3084                _ => Ok(FetchType::UnsupportedExpr),
3085            },
3086            None => Ok(FetchType::Literal(None)),
3087        }
3088    }
3089}
3090
3091/// Removes duplicate rows from the input
3092#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3093pub enum Distinct {
3094    /// Plain `DISTINCT` referencing all selection expressions
3095    All(Arc<LogicalPlan>),
3096    /// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
3097    On(DistinctOn),
3098}
3099
3100impl Distinct {
3101    /// return a reference to the nodes input
3102    pub fn input(&self) -> &Arc<LogicalPlan> {
3103        match self {
3104            Distinct::All(input) => input,
3105            Distinct::On(DistinctOn { input, .. }) => input,
3106        }
3107    }
3108}
3109
3110/// Removes duplicate rows from the input
3111#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3112pub struct DistinctOn {
3113    /// The `DISTINCT ON` clause expression list
3114    pub on_expr: Vec<Expr>,
3115    /// The selected projection expression list
3116    pub select_expr: Vec<Expr>,
3117    /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3118    /// present. Note that those matching expressions actually wrap the `ON` expressions with
3119    /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3120    pub sort_expr: Option<Vec<SortExpr>>,
3121    /// The logical plan that is being DISTINCT'd
3122    pub input: Arc<LogicalPlan>,
3123    /// The schema description of the DISTINCT ON output
3124    pub schema: DFSchemaRef,
3125}
3126
3127impl DistinctOn {
3128    /// Create a new `DistinctOn` struct.
3129    pub fn try_new(
3130        on_expr: Vec<Expr>,
3131        select_expr: Vec<Expr>,
3132        sort_expr: Option<Vec<SortExpr>>,
3133        input: Arc<LogicalPlan>,
3134    ) -> Result<Self> {
3135        if on_expr.is_empty() {
3136            return plan_err!("No `ON` expressions provided");
3137        }
3138
3139        let on_expr = normalize_cols(on_expr, input.as_ref())?;
3140        let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3141            .into_iter()
3142            .collect();
3143
3144        let dfschema = DFSchema::new_with_metadata(
3145            qualified_fields,
3146            input.schema().metadata().clone(),
3147        )?;
3148
3149        let mut distinct_on = DistinctOn {
3150            on_expr,
3151            select_expr,
3152            sort_expr: None,
3153            input,
3154            schema: Arc::new(dfschema),
3155        };
3156
3157        if let Some(sort_expr) = sort_expr {
3158            distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3159        }
3160
3161        Ok(distinct_on)
3162    }
3163
3164    /// Try to update `self` with a new sort expressions.
3165    ///
3166    /// Validates that the sort expressions are a super-set of the `ON` expressions.
3167    pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3168        let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3169
3170        // Check that the left-most sort expressions are the same as the `ON` expressions.
3171        let mut matched = true;
3172        for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3173            if on != &sort.expr {
3174                matched = false;
3175                break;
3176            }
3177        }
3178
3179        if self.on_expr.len() > sort_expr.len() || !matched {
3180            return plan_err!(
3181                "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3182            );
3183        }
3184
3185        self.sort_expr = Some(sort_expr);
3186        Ok(self)
3187    }
3188}
3189
3190// Manual implementation needed because of `schema` field. Comparison excludes this field.
3191impl PartialOrd for DistinctOn {
3192    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3193        #[derive(PartialEq, PartialOrd)]
3194        struct ComparableDistinctOn<'a> {
3195            /// The `DISTINCT ON` clause expression list
3196            pub on_expr: &'a Vec<Expr>,
3197            /// The selected projection expression list
3198            pub select_expr: &'a Vec<Expr>,
3199            /// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
3200            /// present. Note that those matching expressions actually wrap the `ON` expressions with
3201            /// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
3202            pub sort_expr: &'a Option<Vec<SortExpr>>,
3203            /// The logical plan that is being DISTINCT'd
3204            pub input: &'a Arc<LogicalPlan>,
3205        }
3206        let comparable_self = ComparableDistinctOn {
3207            on_expr: &self.on_expr,
3208            select_expr: &self.select_expr,
3209            sort_expr: &self.sort_expr,
3210            input: &self.input,
3211        };
3212        let comparable_other = ComparableDistinctOn {
3213            on_expr: &other.on_expr,
3214            select_expr: &other.select_expr,
3215            sort_expr: &other.sort_expr,
3216            input: &other.input,
3217        };
3218        comparable_self.partial_cmp(&comparable_other)
3219    }
3220}
3221
3222/// Aggregates its input based on a set of grouping and aggregate
3223/// expressions (e.g. SUM).
3224///
3225/// # Output Schema
3226///
3227/// The output schema is the group expressions followed by the aggregate
3228/// expressions in order.
3229///
3230/// For example, given the input schema `"A", "B", "C"` and the aggregate
3231/// `SUM(A) GROUP BY C+B`, the output schema will be `"C+B", "SUM(A)"` where
3232/// "C+B" and "SUM(A)" are the names of the output columns. Note that "C+B" is a
3233/// single new column
3234#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3235// mark non_exhaustive to encourage use of try_new/new()
3236#[non_exhaustive]
3237pub struct Aggregate {
3238    /// The incoming logical plan
3239    pub input: Arc<LogicalPlan>,
3240    /// Grouping expressions
3241    pub group_expr: Vec<Expr>,
3242    /// Aggregate expressions
3243    pub aggr_expr: Vec<Expr>,
3244    /// The schema description of the aggregate output
3245    pub schema: DFSchemaRef,
3246}
3247
3248impl Aggregate {
3249    /// Create a new aggregate operator.
3250    pub fn try_new(
3251        input: Arc<LogicalPlan>,
3252        group_expr: Vec<Expr>,
3253        aggr_expr: Vec<Expr>,
3254    ) -> Result<Self> {
3255        let group_expr = enumerate_grouping_sets(group_expr)?;
3256
3257        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3258
3259        let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3260
3261        let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3262
3263        // Even columns that cannot be null will become nullable when used in a grouping set.
3264        if is_grouping_set {
3265            qualified_fields = qualified_fields
3266                .into_iter()
3267                .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3268                .collect::<Vec<_>>();
3269            qualified_fields.push((
3270                None,
3271                Field::new(
3272                    Self::INTERNAL_GROUPING_ID,
3273                    Self::grouping_id_type(qualified_fields.len()),
3274                    false,
3275                )
3276                .into(),
3277            ));
3278        }
3279
3280        qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3281
3282        let schema = DFSchema::new_with_metadata(
3283            qualified_fields,
3284            input.schema().metadata().clone(),
3285        )?;
3286
3287        Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3288    }
3289
3290    /// Create a new aggregate operator using the provided schema to avoid the overhead of
3291    /// building the schema again when the schema is already known.
3292    ///
3293    /// This method should only be called when you are absolutely sure that the schema being
3294    /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead.
3295    pub fn try_new_with_schema(
3296        input: Arc<LogicalPlan>,
3297        group_expr: Vec<Expr>,
3298        aggr_expr: Vec<Expr>,
3299        schema: DFSchemaRef,
3300    ) -> Result<Self> {
3301        if group_expr.is_empty() && aggr_expr.is_empty() {
3302            return plan_err!(
3303                "Aggregate requires at least one grouping or aggregate expression"
3304            );
3305        }
3306        let group_expr_count = grouping_set_expr_count(&group_expr)?;
3307        if schema.fields().len() != group_expr_count + aggr_expr.len() {
3308            return plan_err!(
3309                "Aggregate schema has wrong number of fields. Expected {} got {}",
3310                group_expr_count + aggr_expr.len(),
3311                schema.fields().len()
3312            );
3313        }
3314
3315        let aggregate_func_dependencies =
3316            calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3317        let new_schema = schema.as_ref().clone();
3318        let schema = Arc::new(
3319            new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3320        );
3321        Ok(Self {
3322            input,
3323            group_expr,
3324            aggr_expr,
3325            schema,
3326        })
3327    }
3328
3329    fn is_grouping_set(&self) -> bool {
3330        matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3331    }
3332
3333    /// Get the output expressions.
3334    fn output_expressions(&self) -> Result<Vec<&Expr>> {
3335        static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3336            Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3337        });
3338        let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3339        if self.is_grouping_set() {
3340            exprs.push(&INTERNAL_ID_EXPR);
3341        }
3342        exprs.extend(self.aggr_expr.iter());
3343        debug_assert!(exprs.len() == self.schema.fields().len());
3344        Ok(exprs)
3345    }
3346
3347    /// Get the length of the group by expression in the output schema
3348    /// This is not simply group by expression length. Expression may be
3349    /// GroupingSet, etc. In these case we need to get inner expression lengths.
3350    pub fn group_expr_len(&self) -> Result<usize> {
3351        grouping_set_expr_count(&self.group_expr)
3352    }
3353
3354    /// Returns the data type of the grouping id.
3355    /// The grouping ID value is a bitmask where each set bit
3356    /// indicates that the corresponding grouping expression is
3357    /// null
3358    pub fn grouping_id_type(group_exprs: usize) -> DataType {
3359        if group_exprs <= 8 {
3360            DataType::UInt8
3361        } else if group_exprs <= 16 {
3362            DataType::UInt16
3363        } else if group_exprs <= 32 {
3364            DataType::UInt32
3365        } else {
3366            DataType::UInt64
3367        }
3368    }
3369
3370    /// Internal column used when the aggregation is a grouping set.
3371    ///
3372    /// This column contains a bitmask where each bit represents a grouping
3373    /// expression. The least significant bit corresponds to the rightmost
3374    /// grouping expression. A bit value of 0 indicates that the corresponding
3375    /// column is included in the grouping set, while a value of 1 means it is excluded.
3376    ///
3377    /// For example, for the grouping expressions CUBE(a, b), the grouping ID
3378    /// column will have the following values:
3379    ///     0b00: Both `a` and `b` are included
3380    ///     0b01: `b` is excluded
3381    ///     0b10: `a` is excluded
3382    ///     0b11: Both `a` and `b` are excluded
3383    ///
3384    /// This internal column is necessary because excluded columns are replaced
3385    /// with `NULL` values. To handle these cases correctly, we must distinguish
3386    /// between an actual `NULL` value in a column and a column being excluded from the set.
3387    pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3388}
3389
3390// Manual implementation needed because of `schema` field. Comparison excludes this field.
3391impl PartialOrd for Aggregate {
3392    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3393        match self.input.partial_cmp(&other.input) {
3394            Some(Ordering::Equal) => {
3395                match self.group_expr.partial_cmp(&other.group_expr) {
3396                    Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3397                    cmp => cmp,
3398                }
3399            }
3400            cmp => cmp,
3401        }
3402    }
3403}
3404
3405/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
3406fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3407    group_expr
3408        .iter()
3409        .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3410}
3411
3412/// Calculates functional dependencies for aggregate expressions.
3413fn calc_func_dependencies_for_aggregate(
3414    // Expressions in the GROUP BY clause:
3415    group_expr: &[Expr],
3416    // Input plan of the aggregate:
3417    input: &LogicalPlan,
3418    // Aggregate schema
3419    aggr_schema: &DFSchema,
3420) -> Result<FunctionalDependencies> {
3421    // We can do a case analysis on how to propagate functional dependencies based on
3422    // whether the GROUP BY in question contains a grouping set expression:
3423    // - If so, the functional dependencies will be empty because we cannot guarantee
3424    //   that GROUP BY expression results will be unique.
3425    // - Otherwise, it may be possible to propagate functional dependencies.
3426    if !contains_grouping_set(group_expr) {
3427        let group_by_expr_names = group_expr
3428            .iter()
3429            .map(|item| item.schema_name().to_string())
3430            .collect::<IndexSet<_>>()
3431            .into_iter()
3432            .collect::<Vec<_>>();
3433        let aggregate_func_dependencies = aggregate_functional_dependencies(
3434            input.schema(),
3435            &group_by_expr_names,
3436            aggr_schema,
3437        );
3438        Ok(aggregate_func_dependencies)
3439    } else {
3440        Ok(FunctionalDependencies::empty())
3441    }
3442}
3443
3444/// This function projects functional dependencies of the `input` plan according
3445/// to projection expressions `exprs`.
3446fn calc_func_dependencies_for_project(
3447    exprs: &[Expr],
3448    input: &LogicalPlan,
3449) -> Result<FunctionalDependencies> {
3450    let input_fields = input.schema().field_names();
3451    // Calculate expression indices (if present) in the input schema.
3452    let proj_indices = exprs
3453        .iter()
3454        .map(|expr| match expr {
3455            #[expect(deprecated)]
3456            Expr::Wildcard { qualifier, options } => {
3457                let wildcard_fields = exprlist_to_fields(
3458                    vec![&Expr::Wildcard {
3459                        qualifier: qualifier.clone(),
3460                        options: options.clone(),
3461                    }],
3462                    input,
3463                )?;
3464                Ok::<_, DataFusionError>(
3465                    wildcard_fields
3466                        .into_iter()
3467                        .filter_map(|(qualifier, f)| {
3468                            let flat_name = qualifier
3469                                .map(|t| format!("{}.{}", t, f.name()))
3470                                .unwrap_or_else(|| f.name().clone());
3471                            input_fields.iter().position(|item| *item == flat_name)
3472                        })
3473                        .collect::<Vec<_>>(),
3474                )
3475            }
3476            Expr::Alias(alias) => {
3477                let name = format!("{}", alias.expr);
3478                Ok(input_fields
3479                    .iter()
3480                    .position(|item| *item == name)
3481                    .map(|i| vec![i])
3482                    .unwrap_or(vec![]))
3483            }
3484            _ => {
3485                let name = format!("{}", expr);
3486                Ok(input_fields
3487                    .iter()
3488                    .position(|item| *item == name)
3489                    .map(|i| vec![i])
3490                    .unwrap_or(vec![]))
3491            }
3492        })
3493        .collect::<Result<Vec<_>>>()?
3494        .into_iter()
3495        .flatten()
3496        .collect::<Vec<_>>();
3497
3498    let len = exprlist_len(exprs, input.schema(), Some(find_base_plan(input).schema()))?;
3499    Ok(input
3500        .schema()
3501        .functional_dependencies()
3502        .project_functional_dependencies(&proj_indices, len))
3503}
3504
3505/// Sorts its input according to a list of sort expressions.
3506#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3507pub struct Sort {
3508    /// The sort expressions
3509    pub expr: Vec<SortExpr>,
3510    /// The incoming logical plan
3511    pub input: Arc<LogicalPlan>,
3512    /// Optional fetch limit
3513    pub fetch: Option<usize>,
3514}
3515
3516/// Join two logical plans on one or more join columns
3517#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3518pub struct Join {
3519    /// Left input
3520    pub left: Arc<LogicalPlan>,
3521    /// Right input
3522    pub right: Arc<LogicalPlan>,
3523    /// Equijoin clause expressed as pairs of (left, right) join expressions
3524    pub on: Vec<(Expr, Expr)>,
3525    /// Filters applied during join (non-equi conditions)
3526    pub filter: Option<Expr>,
3527    /// Join type
3528    pub join_type: JoinType,
3529    /// Join constraint
3530    pub join_constraint: JoinConstraint,
3531    /// The output schema, containing fields from the left and right inputs
3532    pub schema: DFSchemaRef,
3533    /// If null_equals_null is true, null == null else null != null
3534    pub null_equals_null: bool,
3535}
3536
3537impl Join {
3538    /// Create Join with input which wrapped with projection, this method is used to help create physical join.
3539    pub fn try_new_with_project_input(
3540        original: &LogicalPlan,
3541        left: Arc<LogicalPlan>,
3542        right: Arc<LogicalPlan>,
3543        column_on: (Vec<Column>, Vec<Column>),
3544    ) -> Result<Self> {
3545        let original_join = match original {
3546            LogicalPlan::Join(join) => join,
3547            _ => return plan_err!("Could not create join with project input"),
3548        };
3549
3550        let on: Vec<(Expr, Expr)> = column_on
3551            .0
3552            .into_iter()
3553            .zip(column_on.1)
3554            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3555            .collect();
3556        let join_schema =
3557            build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;
3558
3559        Ok(Join {
3560            left,
3561            right,
3562            on,
3563            filter: original_join.filter.clone(),
3564            join_type: original_join.join_type,
3565            join_constraint: original_join.join_constraint,
3566            schema: Arc::new(join_schema),
3567            null_equals_null: original_join.null_equals_null,
3568        })
3569    }
3570}
3571
3572// Manual implementation needed because of `schema` field. Comparison excludes this field.
3573impl PartialOrd for Join {
3574    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3575        #[derive(PartialEq, PartialOrd)]
3576        struct ComparableJoin<'a> {
3577            /// Left input
3578            pub left: &'a Arc<LogicalPlan>,
3579            /// Right input
3580            pub right: &'a Arc<LogicalPlan>,
3581            /// Equijoin clause expressed as pairs of (left, right) join expressions
3582            pub on: &'a Vec<(Expr, Expr)>,
3583            /// Filters applied during join (non-equi conditions)
3584            pub filter: &'a Option<Expr>,
3585            /// Join type
3586            pub join_type: &'a JoinType,
3587            /// Join constraint
3588            pub join_constraint: &'a JoinConstraint,
3589            /// If null_equals_null is true, null == null else null != null
3590            pub null_equals_null: &'a bool,
3591        }
3592        let comparable_self = ComparableJoin {
3593            left: &self.left,
3594            right: &self.right,
3595            on: &self.on,
3596            filter: &self.filter,
3597            join_type: &self.join_type,
3598            join_constraint: &self.join_constraint,
3599            null_equals_null: &self.null_equals_null,
3600        };
3601        let comparable_other = ComparableJoin {
3602            left: &other.left,
3603            right: &other.right,
3604            on: &other.on,
3605            filter: &other.filter,
3606            join_type: &other.join_type,
3607            join_constraint: &other.join_constraint,
3608            null_equals_null: &other.null_equals_null,
3609        };
3610        comparable_self.partial_cmp(&comparable_other)
3611    }
3612}
3613
3614/// Subquery
3615#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3616pub struct Subquery {
3617    /// The subquery
3618    pub subquery: Arc<LogicalPlan>,
3619    /// The outer references used in the subquery
3620    pub outer_ref_columns: Vec<Expr>,
3621}
3622
3623impl Normalizeable for Subquery {
3624    fn can_normalize(&self) -> bool {
3625        false
3626    }
3627}
3628
3629impl NormalizeEq for Subquery {
3630    fn normalize_eq(&self, other: &Self) -> bool {
3631        // TODO: may be implement NormalizeEq for LogicalPlan?
3632        *self.subquery == *other.subquery
3633            && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3634            && self
3635                .outer_ref_columns
3636                .iter()
3637                .zip(other.outer_ref_columns.iter())
3638                .all(|(a, b)| a.normalize_eq(b))
3639    }
3640}
3641
3642impl Subquery {
3643    pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3644        match plan {
3645            Expr::ScalarSubquery(it) => Ok(it),
3646            Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3647            _ => plan_err!("Could not coerce into ScalarSubquery!"),
3648        }
3649    }
3650
3651    pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3652        Subquery {
3653            subquery: plan,
3654            outer_ref_columns: self.outer_ref_columns.clone(),
3655        }
3656    }
3657}
3658
3659impl Debug for Subquery {
3660    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3661        write!(f, "<subquery>")
3662    }
3663}
3664
3665/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
3666///
3667/// See [`Partitioning`] for more details on partitioning
3668///
3669/// [`Partitioning`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
3670#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3671pub enum Partitioning {
3672    /// Allocate batches using a round-robin algorithm and the specified number of partitions
3673    RoundRobinBatch(usize),
3674    /// Allocate rows based on a hash of one of more expressions and the specified number
3675    /// of partitions.
3676    Hash(Vec<Expr>, usize),
3677    /// The DISTRIBUTE BY clause is used to repartition the data based on the input expressions
3678    DistributeBy(Vec<Expr>),
3679}
3680
3681/// Represent the unnesting operation on a list column, such as the recursion depth and
3682/// the output column name after unnesting
3683///
3684/// Example: given `ColumnUnnestList { output_column: "output_name", depth: 2 }`
3685///
3686/// ```text
3687///   input             output_name
3688///  ┌─────────┐      ┌─────────┐
3689///  │{{1,2}}  │      │ 1       │
3690///  ├─────────┼─────►├─────────┤
3691///  │{{3}}    │      │ 2       │
3692///  ├─────────┤      ├─────────┤
3693///  │{{4},{5}}│      │ 3       │
3694///  └─────────┘      ├─────────┤
3695///                   │ 4       │
3696///                   ├─────────┤
3697///                   │ 5       │
3698///                   └─────────┘
3699/// ```
3700#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3701pub struct ColumnUnnestList {
3702    pub output_column: Column,
3703    pub depth: usize,
3704}
3705
3706impl Display for ColumnUnnestList {
3707    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3708        write!(f, "{}|depth={}", self.output_column, self.depth)
3709    }
3710}
3711
3712/// Unnest a column that contains a nested list type. See
3713/// [`UnnestOptions`] for more details.
3714#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3715pub struct Unnest {
3716    /// The incoming logical plan
3717    pub input: Arc<LogicalPlan>,
3718    /// Columns to run unnest on, can be a list of (List/Struct) columns
3719    pub exec_columns: Vec<Column>,
3720    /// refer to the indices(in the input schema) of columns
3721    /// that have type list to run unnest on
3722    pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
3723    /// refer to the indices (in the input schema) of columns
3724    /// that have type struct to run unnest on
3725    pub struct_type_columns: Vec<usize>,
3726    /// Having items aligned with the output columns
3727    /// representing which column in the input schema each output column depends on
3728    pub dependency_indices: Vec<usize>,
3729    /// The output schema, containing the unnested field column.
3730    pub schema: DFSchemaRef,
3731    /// Options
3732    pub options: UnnestOptions,
3733}
3734
3735// Manual implementation needed because of `schema` field. Comparison excludes this field.
3736impl PartialOrd for Unnest {
3737    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3738        #[derive(PartialEq, PartialOrd)]
3739        struct ComparableUnnest<'a> {
3740            /// The incoming logical plan
3741            pub input: &'a Arc<LogicalPlan>,
3742            /// Columns to run unnest on, can be a list of (List/Struct) columns
3743            pub exec_columns: &'a Vec<Column>,
3744            /// refer to the indices(in the input schema) of columns
3745            /// that have type list to run unnest on
3746            pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
3747            /// refer to the indices (in the input schema) of columns
3748            /// that have type struct to run unnest on
3749            pub struct_type_columns: &'a Vec<usize>,
3750            /// Having items aligned with the output columns
3751            /// representing which column in the input schema each output column depends on
3752            pub dependency_indices: &'a Vec<usize>,
3753            /// Options
3754            pub options: &'a UnnestOptions,
3755        }
3756        let comparable_self = ComparableUnnest {
3757            input: &self.input,
3758            exec_columns: &self.exec_columns,
3759            list_type_columns: &self.list_type_columns,
3760            struct_type_columns: &self.struct_type_columns,
3761            dependency_indices: &self.dependency_indices,
3762            options: &self.options,
3763        };
3764        let comparable_other = ComparableUnnest {
3765            input: &other.input,
3766            exec_columns: &other.exec_columns,
3767            list_type_columns: &other.list_type_columns,
3768            struct_type_columns: &other.struct_type_columns,
3769            dependency_indices: &other.dependency_indices,
3770            options: &other.options,
3771        };
3772        comparable_self.partial_cmp(&comparable_other)
3773    }
3774}
3775
3776#[cfg(test)]
3777mod tests {
3778
3779    use super::*;
3780    use crate::builder::LogicalTableSource;
3781    use crate::logical_plan::table_scan;
3782    use crate::{
3783        col, exists, in_subquery, lit, placeholder, scalar_subquery, GroupingSet,
3784    };
3785
3786    use datafusion_common::tree_node::{
3787        TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
3788    };
3789    use datafusion_common::{not_impl_err, Constraint, ScalarValue};
3790
3791    use crate::test::function_stub::count;
3792
3793    fn employee_schema() -> Schema {
3794        Schema::new(vec![
3795            Field::new("id", DataType::Int32, false),
3796            Field::new("first_name", DataType::Utf8, false),
3797            Field::new("last_name", DataType::Utf8, false),
3798            Field::new("state", DataType::Utf8, false),
3799            Field::new("salary", DataType::Int32, false),
3800        ])
3801    }
3802
3803    fn display_plan() -> Result<LogicalPlan> {
3804        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
3805            .build()?;
3806
3807        table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
3808            .filter(in_subquery(col("state"), Arc::new(plan1)))?
3809            .project(vec![col("id")])?
3810            .build()
3811    }
3812
3813    #[test]
3814    fn test_display_indent() -> Result<()> {
3815        let plan = display_plan()?;
3816
3817        let expected = "Projection: employee_csv.id\
3818        \n  Filter: employee_csv.state IN (<subquery>)\
3819        \n    Subquery:\
3820        \n      TableScan: employee_csv projection=[state]\
3821        \n    TableScan: employee_csv projection=[id, state]";
3822
3823        assert_eq!(expected, format!("{}", plan.display_indent()));
3824        Ok(())
3825    }
3826
3827    #[test]
3828    fn test_display_indent_schema() -> Result<()> {
3829        let plan = display_plan()?;
3830
3831        let expected = "Projection: employee_csv.id [id:Int32]\
3832        \n  Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]\
3833        \n    Subquery: [state:Utf8]\
3834        \n      TableScan: employee_csv projection=[state] [state:Utf8]\
3835        \n    TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]";
3836
3837        assert_eq!(expected, format!("{}", plan.display_indent_schema()));
3838        Ok(())
3839    }
3840
3841    #[test]
3842    fn test_display_subquery_alias() -> Result<()> {
3843        let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
3844            .build()?;
3845        let plan1 = Arc::new(plan1);
3846
3847        let plan =
3848            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
3849                .project(vec![col("id"), exists(plan1).alias("exists")])?
3850                .build();
3851
3852        let expected = "Projection: employee_csv.id, EXISTS (<subquery>) AS exists\
3853        \n  Subquery:\
3854        \n    TableScan: employee_csv projection=[state]\
3855        \n  TableScan: employee_csv projection=[id, state]";
3856
3857        assert_eq!(expected, format!("{}", plan?.display_indent()));
3858        Ok(())
3859    }
3860
3861    #[test]
3862    fn test_display_graphviz() -> Result<()> {
3863        let plan = display_plan()?;
3864
3865        let expected_graphviz = r#"
3866// Begin DataFusion GraphViz Plan,
3867// display it online here: https://dreampuf.github.io/GraphvizOnline
3868
3869digraph {
3870  subgraph cluster_1
3871  {
3872    graph[label="LogicalPlan"]
3873    2[shape=box label="Projection: employee_csv.id"]
3874    3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
3875    2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
3876    4[shape=box label="Subquery:"]
3877    3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
3878    5[shape=box label="TableScan: employee_csv projection=[state]"]
3879    4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
3880    6[shape=box label="TableScan: employee_csv projection=[id, state]"]
3881    3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
3882  }
3883  subgraph cluster_7
3884  {
3885    graph[label="Detailed LogicalPlan"]
3886    8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
3887    9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
3888    8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
3889    10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
3890    9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
3891    11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
3892    10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
3893    12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
3894    9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
3895  }
3896}
3897// End DataFusion GraphViz Plan
3898"#;
3899
3900        // just test for a few key lines in the output rather than the
3901        // whole thing to make test maintenance easier.
3902        let graphviz = format!("{}", plan.display_graphviz());
3903
3904        assert_eq!(expected_graphviz, graphviz);
3905        Ok(())
3906    }
3907
3908    #[test]
3909    fn test_display_pg_json() -> Result<()> {
3910        let plan = display_plan()?;
3911
3912        let expected_pg_json = r#"[
3913  {
3914    "Plan": {
3915      "Expressions": [
3916        "employee_csv.id"
3917      ],
3918      "Node Type": "Projection",
3919      "Output": [
3920        "id"
3921      ],
3922      "Plans": [
3923        {
3924          "Condition": "employee_csv.state IN (<subquery>)",
3925          "Node Type": "Filter",
3926          "Output": [
3927            "id",
3928            "state"
3929          ],
3930          "Plans": [
3931            {
3932              "Node Type": "Subquery",
3933              "Output": [
3934                "state"
3935              ],
3936              "Plans": [
3937                {
3938                  "Node Type": "TableScan",
3939                  "Output": [
3940                    "state"
3941                  ],
3942                  "Plans": [],
3943                  "Relation Name": "employee_csv"
3944                }
3945              ]
3946            },
3947            {
3948              "Node Type": "TableScan",
3949              "Output": [
3950                "id",
3951                "state"
3952              ],
3953              "Plans": [],
3954              "Relation Name": "employee_csv"
3955            }
3956          ]
3957        }
3958      ]
3959    }
3960  }
3961]"#;
3962
3963        let pg_json = format!("{}", plan.display_pg_json());
3964
3965        assert_eq!(expected_pg_json, pg_json);
3966        Ok(())
3967    }
3968
3969    /// Tests for the Visitor trait and walking logical plan nodes
3970    #[derive(Debug, Default)]
3971    struct OkVisitor {
3972        strings: Vec<String>,
3973    }
3974
3975    impl<'n> TreeNodeVisitor<'n> for OkVisitor {
3976        type Node = LogicalPlan;
3977
3978        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
3979            let s = match plan {
3980                LogicalPlan::Projection { .. } => "pre_visit Projection",
3981                LogicalPlan::Filter { .. } => "pre_visit Filter",
3982                LogicalPlan::TableScan { .. } => "pre_visit TableScan",
3983                _ => {
3984                    return not_impl_err!("unknown plan type");
3985                }
3986            };
3987
3988            self.strings.push(s.into());
3989            Ok(TreeNodeRecursion::Continue)
3990        }
3991
3992        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
3993            let s = match plan {
3994                LogicalPlan::Projection { .. } => "post_visit Projection",
3995                LogicalPlan::Filter { .. } => "post_visit Filter",
3996                LogicalPlan::TableScan { .. } => "post_visit TableScan",
3997                _ => {
3998                    return not_impl_err!("unknown plan type");
3999                }
4000            };
4001
4002            self.strings.push(s.into());
4003            Ok(TreeNodeRecursion::Continue)
4004        }
4005    }
4006
4007    #[test]
4008    fn visit_order() {
4009        let mut visitor = OkVisitor::default();
4010        let plan = test_plan();
4011        let res = plan.visit_with_subqueries(&mut visitor);
4012        assert!(res.is_ok());
4013
4014        assert_eq!(
4015            visitor.strings,
4016            vec![
4017                "pre_visit Projection",
4018                "pre_visit Filter",
4019                "pre_visit TableScan",
4020                "post_visit TableScan",
4021                "post_visit Filter",
4022                "post_visit Projection",
4023            ]
4024        );
4025    }
4026
4027    #[derive(Debug, Default)]
4028    /// Counter than counts to zero and returns true when it gets there
4029    struct OptionalCounter {
4030        val: Option<usize>,
4031    }
4032
4033    impl OptionalCounter {
4034        fn new(val: usize) -> Self {
4035            Self { val: Some(val) }
4036        }
4037        // Decrements the counter by 1, if any, returning true if it hits zero
4038        fn dec(&mut self) -> bool {
4039            if Some(0) == self.val {
4040                true
4041            } else {
4042                self.val = self.val.take().map(|i| i - 1);
4043                false
4044            }
4045        }
4046    }
4047
4048    #[derive(Debug, Default)]
4049    /// Visitor that returns false after some number of visits
4050    struct StoppingVisitor {
4051        inner: OkVisitor,
4052        /// When Some(0) returns false from pre_visit
4053        return_false_from_pre_in: OptionalCounter,
4054        /// When Some(0) returns false from post_visit
4055        return_false_from_post_in: OptionalCounter,
4056    }
4057
4058    impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4059        type Node = LogicalPlan;
4060
4061        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4062            if self.return_false_from_pre_in.dec() {
4063                return Ok(TreeNodeRecursion::Stop);
4064            }
4065            self.inner.f_down(plan)?;
4066
4067            Ok(TreeNodeRecursion::Continue)
4068        }
4069
4070        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4071            if self.return_false_from_post_in.dec() {
4072                return Ok(TreeNodeRecursion::Stop);
4073            }
4074
4075            self.inner.f_up(plan)
4076        }
4077    }
4078
4079    /// test early stopping in pre-visit
4080    #[test]
4081    fn early_stopping_pre_visit() {
4082        let mut visitor = StoppingVisitor {
4083            return_false_from_pre_in: OptionalCounter::new(2),
4084            ..Default::default()
4085        };
4086        let plan = test_plan();
4087        let res = plan.visit_with_subqueries(&mut visitor);
4088        assert!(res.is_ok());
4089
4090        assert_eq!(
4091            visitor.inner.strings,
4092            vec!["pre_visit Projection", "pre_visit Filter"]
4093        );
4094    }
4095
4096    #[test]
4097    fn early_stopping_post_visit() {
4098        let mut visitor = StoppingVisitor {
4099            return_false_from_post_in: OptionalCounter::new(1),
4100            ..Default::default()
4101        };
4102        let plan = test_plan();
4103        let res = plan.visit_with_subqueries(&mut visitor);
4104        assert!(res.is_ok());
4105
4106        assert_eq!(
4107            visitor.inner.strings,
4108            vec![
4109                "pre_visit Projection",
4110                "pre_visit Filter",
4111                "pre_visit TableScan",
4112                "post_visit TableScan",
4113            ]
4114        );
4115    }
4116
4117    #[derive(Debug, Default)]
4118    /// Visitor that returns an error after some number of visits
4119    struct ErrorVisitor {
4120        inner: OkVisitor,
4121        /// When Some(0) returns false from pre_visit
4122        return_error_from_pre_in: OptionalCounter,
4123        /// When Some(0) returns false from post_visit
4124        return_error_from_post_in: OptionalCounter,
4125    }
4126
4127    impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4128        type Node = LogicalPlan;
4129
4130        fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4131            if self.return_error_from_pre_in.dec() {
4132                return not_impl_err!("Error in pre_visit");
4133            }
4134
4135            self.inner.f_down(plan)
4136        }
4137
4138        fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4139            if self.return_error_from_post_in.dec() {
4140                return not_impl_err!("Error in post_visit");
4141            }
4142
4143            self.inner.f_up(plan)
4144        }
4145    }
4146
4147    #[test]
4148    fn error_pre_visit() {
4149        let mut visitor = ErrorVisitor {
4150            return_error_from_pre_in: OptionalCounter::new(2),
4151            ..Default::default()
4152        };
4153        let plan = test_plan();
4154        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4155        assert_eq!(
4156            "This feature is not implemented: Error in pre_visit",
4157            res.strip_backtrace()
4158        );
4159        assert_eq!(
4160            visitor.inner.strings,
4161            vec!["pre_visit Projection", "pre_visit Filter"]
4162        );
4163    }
4164
4165    #[test]
4166    fn error_post_visit() {
4167        let mut visitor = ErrorVisitor {
4168            return_error_from_post_in: OptionalCounter::new(1),
4169            ..Default::default()
4170        };
4171        let plan = test_plan();
4172        let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4173        assert_eq!(
4174            "This feature is not implemented: Error in post_visit",
4175            res.strip_backtrace()
4176        );
4177        assert_eq!(
4178            visitor.inner.strings,
4179            vec![
4180                "pre_visit Projection",
4181                "pre_visit Filter",
4182                "pre_visit TableScan",
4183                "post_visit TableScan",
4184            ]
4185        );
4186    }
4187
4188    #[test]
4189    fn projection_expr_schema_mismatch() -> Result<()> {
4190        let empty_schema = Arc::new(DFSchema::empty());
4191        let p = Projection::try_new_with_schema(
4192            vec![col("a")],
4193            Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4194                produce_one_row: false,
4195                schema: Arc::clone(&empty_schema),
4196            })),
4197            empty_schema,
4198        );
4199        assert_eq!(p.err().unwrap().strip_backtrace(), "Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4200        Ok(())
4201    }
4202
4203    fn test_plan() -> LogicalPlan {
4204        let schema = Schema::new(vec![
4205            Field::new("id", DataType::Int32, false),
4206            Field::new("state", DataType::Utf8, false),
4207        ]);
4208
4209        table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4210            .unwrap()
4211            .filter(col("state").eq(lit("CO")))
4212            .unwrap()
4213            .project(vec![col("id")])
4214            .unwrap()
4215            .build()
4216            .unwrap()
4217    }
4218
4219    #[test]
4220    fn test_replace_invalid_placeholder() {
4221        // test empty placeholder
4222        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4223
4224        let plan = table_scan(TableReference::none(), &schema, None)
4225            .unwrap()
4226            .filter(col("id").eq(placeholder("")))
4227            .unwrap()
4228            .build()
4229            .unwrap();
4230
4231        let param_values = vec![ScalarValue::Int32(Some(42))];
4232        plan.replace_params_with_values(&param_values.clone().into())
4233            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4234
4235        // test $0 placeholder
4236        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4237
4238        let plan = table_scan(TableReference::none(), &schema, None)
4239            .unwrap()
4240            .filter(col("id").eq(placeholder("$0")))
4241            .unwrap()
4242            .build()
4243            .unwrap();
4244
4245        plan.replace_params_with_values(&param_values.clone().into())
4246            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4247
4248        // test $00 placeholder
4249        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4250
4251        let plan = table_scan(TableReference::none(), &schema, None)
4252            .unwrap()
4253            .filter(col("id").eq(placeholder("$00")))
4254            .unwrap()
4255            .build()
4256            .unwrap();
4257
4258        plan.replace_params_with_values(&param_values.into())
4259            .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4260    }
4261
4262    #[test]
4263    fn test_nullable_schema_after_grouping_set() {
4264        let schema = Schema::new(vec![
4265            Field::new("foo", DataType::Int32, false),
4266            Field::new("bar", DataType::Int32, false),
4267        ]);
4268
4269        let plan = table_scan(TableReference::none(), &schema, None)
4270            .unwrap()
4271            .aggregate(
4272                vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4273                    vec![col("foo")],
4274                    vec![col("bar")],
4275                ]))],
4276                vec![count(lit(true))],
4277            )
4278            .unwrap()
4279            .build()
4280            .unwrap();
4281
4282        let output_schema = plan.schema();
4283
4284        assert!(output_schema
4285            .field_with_name(None, "foo")
4286            .unwrap()
4287            .is_nullable(),);
4288        assert!(output_schema
4289            .field_with_name(None, "bar")
4290            .unwrap()
4291            .is_nullable());
4292    }
4293
4294    #[test]
4295    fn test_filter_is_scalar() {
4296        // test empty placeholder
4297        let schema =
4298            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4299
4300        let source = Arc::new(LogicalTableSource::new(schema));
4301        let schema = Arc::new(
4302            DFSchema::try_from_qualified_schema(
4303                TableReference::bare("tab"),
4304                &source.schema(),
4305            )
4306            .unwrap(),
4307        );
4308        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4309            table_name: TableReference::bare("tab"),
4310            source: Arc::clone(&source) as Arc<dyn TableSource>,
4311            projection: None,
4312            projected_schema: Arc::clone(&schema),
4313            filters: vec![],
4314            fetch: None,
4315        }));
4316        let col = schema.field_names()[0].clone();
4317
4318        let filter = Filter::try_new(
4319            Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)))),
4320            scan,
4321        )
4322        .unwrap();
4323        assert!(!filter.is_scalar());
4324        let unique_schema = Arc::new(
4325            schema
4326                .as_ref()
4327                .clone()
4328                .with_functional_dependencies(
4329                    FunctionalDependencies::new_from_constraints(
4330                        Some(&Constraints::new_unverified(vec![Constraint::Unique(
4331                            vec![0],
4332                        )])),
4333                        1,
4334                    ),
4335                )
4336                .unwrap(),
4337        );
4338        let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4339            table_name: TableReference::bare("tab"),
4340            source,
4341            projection: None,
4342            projected_schema: Arc::clone(&unique_schema),
4343            filters: vec![],
4344            fetch: None,
4345        }));
4346        let col = schema.field_names()[0].clone();
4347
4348        let filter =
4349            Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4350        assert!(filter.is_scalar());
4351    }
4352
4353    #[test]
4354    fn test_transform_explain() {
4355        let schema = Schema::new(vec![
4356            Field::new("foo", DataType::Int32, false),
4357            Field::new("bar", DataType::Int32, false),
4358        ]);
4359
4360        let plan = table_scan(TableReference::none(), &schema, None)
4361            .unwrap()
4362            .explain(false, false)
4363            .unwrap()
4364            .build()
4365            .unwrap();
4366
4367        let external_filter = col("foo").eq(lit(true));
4368
4369        // after transformation, because plan is not the same anymore,
4370        // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
4371        let plan = plan
4372            .transform(|plan| match plan {
4373                LogicalPlan::TableScan(table) => {
4374                    let filter = Filter::try_new(
4375                        external_filter.clone(),
4376                        Arc::new(LogicalPlan::TableScan(table)),
4377                    )
4378                    .unwrap();
4379                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4380                }
4381                x => Ok(Transformed::no(x)),
4382            })
4383            .data()
4384            .unwrap();
4385
4386        let expected = "Explain\
4387                        \n  Filter: foo = Boolean(true)\
4388                        \n    TableScan: ?table?";
4389        let actual = format!("{}", plan.display_indent());
4390        assert_eq!(expected.to_string(), actual)
4391    }
4392
4393    #[test]
4394    fn test_plan_partial_ord() {
4395        let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4396            produce_one_row: false,
4397            schema: Arc::new(DFSchema::empty()),
4398        });
4399
4400        let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4401            schema: Arc::new(Schema::new(vec![Field::new(
4402                "foo",
4403                DataType::Int32,
4404                false,
4405            )])),
4406            output_schema: DFSchemaRef::new(DFSchema::empty()),
4407        });
4408
4409        let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4410            schema: Arc::new(Schema::new(vec![Field::new(
4411                "foo",
4412                DataType::Int32,
4413                false,
4414            )])),
4415            output_schema: DFSchemaRef::new(DFSchema::empty()),
4416        });
4417
4418        assert_eq!(
4419            empty_relation.partial_cmp(&describe_table),
4420            Some(Ordering::Less)
4421        );
4422        assert_eq!(
4423            describe_table.partial_cmp(&empty_relation),
4424            Some(Ordering::Greater)
4425        );
4426        assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4427    }
4428
4429    #[test]
4430    fn test_limit_with_new_children() {
4431        let input = Arc::new(LogicalPlan::Values(Values {
4432            schema: Arc::new(DFSchema::empty()),
4433            values: vec![vec![]],
4434        }));
4435        let cases = [
4436            LogicalPlan::Limit(Limit {
4437                skip: None,
4438                fetch: None,
4439                input: Arc::clone(&input),
4440            }),
4441            LogicalPlan::Limit(Limit {
4442                skip: None,
4443                fetch: Some(Box::new(Expr::Literal(
4444                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4445                ))),
4446                input: Arc::clone(&input),
4447            }),
4448            LogicalPlan::Limit(Limit {
4449                skip: Some(Box::new(Expr::Literal(
4450                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4451                ))),
4452                fetch: None,
4453                input: Arc::clone(&input),
4454            }),
4455            LogicalPlan::Limit(Limit {
4456                skip: Some(Box::new(Expr::Literal(
4457                    ScalarValue::new_one(&DataType::UInt32).unwrap(),
4458                ))),
4459                fetch: Some(Box::new(Expr::Literal(
4460                    ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4461                ))),
4462                input,
4463            }),
4464        ];
4465
4466        for limit in cases {
4467            let new_limit = limit
4468                .with_new_exprs(
4469                    limit.expressions(),
4470                    limit.inputs().into_iter().cloned().collect(),
4471                )
4472                .unwrap();
4473            assert_eq!(limit, new_limit);
4474        }
4475    }
4476
4477    #[test]
4478    fn test_with_subqueries_jump() {
4479        // The test plan contains a `Project` node above a `Filter` node, and the
4480        // `Project` node contains a subquery plan with a `Filter` root node, so returning
4481        // `TreeNodeRecursion::Jump` on `Project` should cause not visiting any of the
4482        // `Filter`s.
4483        let subquery_schema =
4484            Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
4485
4486        let subquery_plan =
4487            table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
4488                .unwrap()
4489                .filter(col("sub_id").eq(lit(0)))
4490                .unwrap()
4491                .build()
4492                .unwrap();
4493
4494        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4495
4496        let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
4497            .unwrap()
4498            .filter(col("id").eq(lit(0)))
4499            .unwrap()
4500            .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
4501            .unwrap()
4502            .build()
4503            .unwrap();
4504
4505        let mut filter_found = false;
4506        plan.apply_with_subqueries(|plan| {
4507            match plan {
4508                LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4509                LogicalPlan::Filter(..) => filter_found = true,
4510                _ => {}
4511            }
4512            Ok(TreeNodeRecursion::Continue)
4513        })
4514        .unwrap();
4515        assert!(!filter_found);
4516
4517        struct ProjectJumpVisitor {
4518            filter_found: bool,
4519        }
4520
4521        impl ProjectJumpVisitor {
4522            fn new() -> Self {
4523                Self {
4524                    filter_found: false,
4525                }
4526            }
4527        }
4528
4529        impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
4530            type Node = LogicalPlan;
4531
4532            fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
4533                match node {
4534                    LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4535                    LogicalPlan::Filter(..) => self.filter_found = true,
4536                    _ => {}
4537                }
4538                Ok(TreeNodeRecursion::Continue)
4539            }
4540        }
4541
4542        let mut visitor = ProjectJumpVisitor::new();
4543        plan.visit_with_subqueries(&mut visitor).unwrap();
4544        assert!(!visitor.filter_found);
4545
4546        let mut filter_found = false;
4547        plan.clone()
4548            .transform_down_with_subqueries(|plan| {
4549                match plan {
4550                    LogicalPlan::Projection(..) => {
4551                        return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
4552                    }
4553                    LogicalPlan::Filter(..) => filter_found = true,
4554                    _ => {}
4555                }
4556                Ok(Transformed::no(plan))
4557            })
4558            .unwrap();
4559        assert!(!filter_found);
4560
4561        let mut filter_found = false;
4562        plan.clone()
4563            .transform_down_up_with_subqueries(
4564                |plan| {
4565                    match plan {
4566                        LogicalPlan::Projection(..) => {
4567                            return Ok(Transformed::new(
4568                                plan,
4569                                false,
4570                                TreeNodeRecursion::Jump,
4571                            ))
4572                        }
4573                        LogicalPlan::Filter(..) => filter_found = true,
4574                        _ => {}
4575                    }
4576                    Ok(Transformed::no(plan))
4577                },
4578                |plan| Ok(Transformed::no(plan)),
4579            )
4580            .unwrap();
4581        assert!(!filter_found);
4582
4583        struct ProjectJumpRewriter {
4584            filter_found: bool,
4585        }
4586
4587        impl ProjectJumpRewriter {
4588            fn new() -> Self {
4589                Self {
4590                    filter_found: false,
4591                }
4592            }
4593        }
4594
4595        impl TreeNodeRewriter for ProjectJumpRewriter {
4596            type Node = LogicalPlan;
4597
4598            fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
4599                match node {
4600                    LogicalPlan::Projection(..) => {
4601                        return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
4602                    }
4603                    LogicalPlan::Filter(..) => self.filter_found = true,
4604                    _ => {}
4605                }
4606                Ok(Transformed::no(node))
4607            }
4608        }
4609
4610        let mut rewriter = ProjectJumpRewriter::new();
4611        plan.rewrite_with_subqueries(&mut rewriter).unwrap();
4612        assert!(!rewriter.filter_found);
4613    }
4614
4615    #[test]
4616    fn test_with_unresolved_placeholders() {
4617        let field_name = "id";
4618        let placeholder_value = "$1";
4619        let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
4620
4621        let plan = table_scan(TableReference::none(), &schema, None)
4622            .unwrap()
4623            .filter(col(field_name).eq(placeholder(placeholder_value)))
4624            .unwrap()
4625            .build()
4626            .unwrap();
4627
4628        // Check that the placeholder parameters have not received a DataType.
4629        let params = plan.get_parameter_types().unwrap();
4630        assert_eq!(params.len(), 1);
4631
4632        let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
4633        assert_eq!(parameter_type, None);
4634    }
4635}