datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::any::Any;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, Sort as SortExpr};
28use crate::expr_rewriter::{
29    coerce_plan_expr_for_schema, normalize_col,
30    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31    rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37    Window,
38};
39use crate::utils::{
40    can_hash, columnize_expr, compare_sort_expr, expr_to_columns,
41    find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
42};
43use crate::{
44    and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
45    Statement, TableProviderFilterPushDown, TableSource, WriteOp,
46};
47
48use super::dml::InsertOp;
49use super::plan::ColumnUnnestList;
50use arrow::compute::can_cast_types;
51use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
52use datafusion_common::display::ToStringifiedPlan;
53use datafusion_common::file_options::file_type::FileType;
54use datafusion_common::{
55    exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
56    plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
57    DataFusionError, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
58};
59use datafusion_expr_common::type_coercion::binary::type_union_resolution;
60
61use indexmap::IndexSet;
62
63/// Default table name for unnamed table
64pub const UNNAMED_TABLE: &str = "?table?";
65
66/// Options for [`LogicalPlanBuilder`]
67#[derive(Default, Debug, Clone)]
68pub struct LogicalPlanBuilderOptions {
69    /// Flag indicating whether the plan builder should add
70    /// functionally dependent expressions as additional aggregation groupings.
71    add_implicit_group_by_exprs: bool,
72}
73
74impl LogicalPlanBuilderOptions {
75    pub fn new() -> Self {
76        Default::default()
77    }
78
79    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
80    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
81        self.add_implicit_group_by_exprs = add;
82        self
83    }
84}
85
86/// Builder for logical plans
87///
88/// # Example building a simple plan
89/// ```
90/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
91/// # use datafusion_common::Result;
92/// # use arrow::datatypes::{Schema, DataType, Field};
93/// #
94/// # fn main() -> Result<()> {
95/// #
96/// # fn employee_schema() -> Schema {
97/// #    Schema::new(vec![
98/// #           Field::new("id", DataType::Int32, false),
99/// #           Field::new("first_name", DataType::Utf8, false),
100/// #           Field::new("last_name", DataType::Utf8, false),
101/// #           Field::new("state", DataType::Utf8, false),
102/// #           Field::new("salary", DataType::Int32, false),
103/// #       ])
104/// #   }
105/// #
106/// // Create a plan similar to
107/// // SELECT last_name
108/// // FROM employees
109/// // WHERE salary < 1000
110/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
111///  // Keep only rows where salary < 1000
112///  .filter(col("salary").lt(lit(1000)))?
113///  // only show "last_name" in the final results
114///  .project(vec![col("last_name")])?
115///  .build()?;
116///
117/// // Convert from plan back to builder
118/// let builder = LogicalPlanBuilder::from(plan);
119///
120/// # Ok(())
121/// # }
122/// ```
123#[derive(Debug, Clone)]
124pub struct LogicalPlanBuilder {
125    plan: Arc<LogicalPlan>,
126    options: LogicalPlanBuilderOptions,
127}
128
129impl LogicalPlanBuilder {
130    /// Create a builder from an existing plan
131    pub fn new(plan: LogicalPlan) -> Self {
132        Self {
133            plan: Arc::new(plan),
134            options: LogicalPlanBuilderOptions::default(),
135        }
136    }
137
138    /// Create a builder from an existing plan
139    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
140        Self {
141            plan,
142            options: LogicalPlanBuilderOptions::default(),
143        }
144    }
145
146    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
147        self.options = options;
148        self
149    }
150
151    /// Return the output schema of the plan build so far
152    pub fn schema(&self) -> &DFSchemaRef {
153        self.plan.schema()
154    }
155
156    /// Return the LogicalPlan of the plan build so far
157    pub fn plan(&self) -> &LogicalPlan {
158        &self.plan
159    }
160
161    /// Create an empty relation.
162    ///
163    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
164    pub fn empty(produce_one_row: bool) -> Self {
165        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
166            produce_one_row,
167            schema: DFSchemaRef::new(DFSchema::empty()),
168        }))
169    }
170
171    /// Convert a regular plan into a recursive query.
172    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
173    pub fn to_recursive_query(
174        self,
175        name: String,
176        recursive_term: LogicalPlan,
177        is_distinct: bool,
178    ) -> Result<Self> {
179        // TODO: we need to do a bunch of validation here. Maybe more.
180        if is_distinct {
181            return not_impl_err!(
182                "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
183            );
184        }
185        // Ensure that the static term and the recursive term have the same number of fields
186        let static_fields_len = self.plan.schema().fields().len();
187        let recursive_fields_len = recursive_term.schema().fields().len();
188        if static_fields_len != recursive_fields_len {
189            return plan_err!(
190                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
191                static_fields_len, recursive_fields_len
192            );
193        }
194        // Ensure that the recursive term has the same field types as the static term
195        let coerced_recursive_term =
196            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
197        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
198            name,
199            static_term: self.plan,
200            recursive_term: Arc::new(coerced_recursive_term),
201            is_distinct,
202        })))
203    }
204
205    /// Create a values list based relation, and the schema is inferred from data, consuming
206    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
207    /// documentation for more details.
208    ///
209    /// so it's usually better to override the default names with a table alias list.
210    ///
211    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
212    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
213        if values.is_empty() {
214            return plan_err!("Values list cannot be empty");
215        }
216        let n_cols = values[0].len();
217        if n_cols == 0 {
218            return plan_err!("Values list cannot be zero length");
219        }
220        for (i, row) in values.iter().enumerate() {
221            if row.len() != n_cols {
222                return plan_err!(
223                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
224                    row.len(),
225                    i,
226                    n_cols
227                );
228            }
229        }
230
231        // Infer from data itself
232        Self::infer_data(values)
233    }
234
235    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
236    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
237    /// documentation for more details.
238    ///
239    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
240    /// The column names are not specified by the SQL standard and different database systems do it differently,
241    /// so it's usually better to override the default names with a table alias list.
242    ///
243    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
244    pub fn values_with_schema(
245        values: Vec<Vec<Expr>>,
246        schema: &DFSchemaRef,
247    ) -> Result<Self> {
248        if values.is_empty() {
249            return plan_err!("Values list cannot be empty");
250        }
251        let n_cols = schema.fields().len();
252        if n_cols == 0 {
253            return plan_err!("Values list cannot be zero length");
254        }
255        for (i, row) in values.iter().enumerate() {
256            if row.len() != n_cols {
257                return plan_err!(
258                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
259                    row.len(),
260                    i,
261                    n_cols
262                );
263            }
264        }
265
266        // Check the type of value against the schema
267        Self::infer_values_from_schema(values, schema)
268    }
269
270    fn infer_values_from_schema(
271        values: Vec<Vec<Expr>>,
272        schema: &DFSchema,
273    ) -> Result<Self> {
274        let n_cols = values[0].len();
275        let mut fields = ValuesFields::new();
276        for j in 0..n_cols {
277            let field_type = schema.field(j).data_type();
278            let field_nullable = schema.field(j).is_nullable();
279            for row in values.iter() {
280                let value = &row[j];
281                let data_type = value.get_type(schema)?;
282
283                if !data_type.equals_datatype(field_type) {
284                    if can_cast_types(&data_type, field_type) {
285                    } else {
286                        return exec_err!(
287                            "type mismatch and can't cast to got {} and {}",
288                            data_type,
289                            field_type
290                        );
291                    }
292                }
293            }
294            fields.push(field_type.to_owned(), field_nullable);
295        }
296
297        Self::infer_inner(values, fields, schema)
298    }
299
300    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
301        let n_cols = values[0].len();
302        let schema = DFSchema::empty();
303        let mut fields = ValuesFields::new();
304
305        for j in 0..n_cols {
306            let mut common_type: Option<DataType> = None;
307            for (i, row) in values.iter().enumerate() {
308                let value = &row[j];
309                let data_type = value.get_type(&schema)?;
310                if data_type == DataType::Null {
311                    continue;
312                }
313
314                if let Some(prev_type) = common_type {
315                    // get common type of each column values.
316                    let data_types = vec![prev_type.clone(), data_type.clone()];
317                    let Some(new_type) = type_union_resolution(&data_types) else {
318                        return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
319                    };
320                    common_type = Some(new_type);
321                } else {
322                    common_type = Some(data_type);
323                }
324            }
325            // assuming common_type was not set, and no error, therefore the type should be NULL
326            // since the code loop skips NULL
327            fields.push(common_type.unwrap_or(DataType::Null), true);
328        }
329
330        Self::infer_inner(values, fields, &schema)
331    }
332
333    fn infer_inner(
334        mut values: Vec<Vec<Expr>>,
335        fields: ValuesFields,
336        schema: &DFSchema,
337    ) -> Result<Self> {
338        let fields = fields.into_fields();
339        // wrap cast if data type is not same as common type.
340        for row in &mut values {
341            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
342                if let Expr::Literal(ScalarValue::Null) = row[j] {
343                    row[j] = Expr::Literal(ScalarValue::try_from(field_type)?);
344                } else {
345                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
346                }
347            }
348        }
349
350        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
351        let schema = DFSchemaRef::new(dfschema);
352
353        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
354    }
355
356    /// Convert a table provider into a builder with a TableScan
357    ///
358    /// Note that if you pass a string as `table_name`, it is treated
359    /// as a SQL identifier, as described on [`TableReference`] and
360    /// thus is normalized
361    ///
362    /// # Example:
363    /// ```
364    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
365    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
366    /// # };
367    /// # use std::sync::Arc;
368    /// # use arrow::datatypes::{Schema, DataType, Field};
369    /// # use datafusion_common::TableReference;
370    /// #
371    /// # let employee_schema = Arc::new(Schema::new(vec![
372    /// #           Field::new("id", DataType::Int32, false),
373    /// # ])) as _;
374    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
375    /// // Scan table_source with the name "mytable" (after normalization)
376    /// # let table = table_source.clone();
377    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
378    ///
379    /// // Scan table_source with the name "MyTable" by enclosing in quotes
380    /// # let table = table_source.clone();
381    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
382    ///
383    /// // Scan table_source with the name "MyTable" by forming the table reference
384    /// # let table = table_source.clone();
385    /// let table_reference = TableReference::bare("MyTable");
386    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
387    /// ```
388    pub fn scan(
389        table_name: impl Into<TableReference>,
390        table_source: Arc<dyn TableSource>,
391        projection: Option<Vec<usize>>,
392    ) -> Result<Self> {
393        Self::scan_with_filters(table_name, table_source, projection, vec![])
394    }
395
396    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
397    pub fn copy_to(
398        input: LogicalPlan,
399        output_url: String,
400        file_type: Arc<dyn FileType>,
401        options: HashMap<String, String>,
402        partition_by: Vec<String>,
403    ) -> Result<Self> {
404        Ok(Self::new(LogicalPlan::Copy(CopyTo {
405            input: Arc::new(input),
406            output_url,
407            partition_by,
408            file_type,
409            options,
410        })))
411    }
412
413    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
414    ///
415    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
416    ///
417    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
418    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
419    ///
420    /// # Example:
421    /// ```
422    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
423    /// #  logical_plan::builder::LogicalTableSource,
424    /// # };
425    /// # use std::sync::Arc;
426    /// # use arrow::datatypes::{Schema, DataType, Field};
427    /// # use datafusion_expr::dml::InsertOp;
428    /// #
429    /// # fn test() -> datafusion_common::Result<()> {
430    /// # let employee_schema = Arc::new(Schema::new(vec![
431    /// #     Field::new("id", DataType::Int32, false),
432    /// # ])) as _;
433    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
434    /// // VALUES (1), (2)
435    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?
436    ///   .build()?;
437    /// // INSERT INTO MyTable VALUES (1), (2)
438    /// let insert_plan = LogicalPlanBuilder::insert_into(
439    ///   input,
440    ///   "MyTable",
441    ///   table_source,
442    ///   InsertOp::Append,
443    /// )?;
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub fn insert_into(
448        input: LogicalPlan,
449        table_name: impl Into<TableReference>,
450        target: Arc<dyn TableSource>,
451        insert_op: InsertOp,
452    ) -> Result<Self> {
453        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
454            table_name.into(),
455            target,
456            WriteOp::Insert(insert_op),
457            Arc::new(input),
458        ))))
459    }
460
461    /// Convert a table provider into a builder with a TableScan
462    pub fn scan_with_filters(
463        table_name: impl Into<TableReference>,
464        table_source: Arc<dyn TableSource>,
465        projection: Option<Vec<usize>>,
466        filters: Vec<Expr>,
467    ) -> Result<Self> {
468        TableScan::try_new(table_name, table_source, projection, filters, None)
469            .map(LogicalPlan::TableScan)
470            .map(Self::new)
471    }
472
473    /// Convert a table provider into a builder with a TableScan with filter and fetch
474    pub fn scan_with_filters_fetch(
475        table_name: impl Into<TableReference>,
476        table_source: Arc<dyn TableSource>,
477        projection: Option<Vec<usize>>,
478        filters: Vec<Expr>,
479        fetch: Option<usize>,
480    ) -> Result<Self> {
481        TableScan::try_new(table_name, table_source, projection, filters, fetch)
482            .map(LogicalPlan::TableScan)
483            .map(Self::new)
484    }
485
486    /// Wrap a plan in a window
487    pub fn window_plan(
488        input: LogicalPlan,
489        window_exprs: Vec<Expr>,
490    ) -> Result<LogicalPlan> {
491        let mut plan = input;
492        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
493        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
494        // we compare the sort key themselves and if one window's sort keys are a prefix of another
495        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
496        // The sort_by() implementation here is a stable sort.
497        // Note that by this rule if there's an empty over, it'll be at the top level
498        groups.sort_by(|(key_a, _), (key_b, _)| {
499            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
500                let key_ordering = compare_sort_expr(first, second, plan.schema());
501                match key_ordering {
502                    Ordering::Less => {
503                        return Ordering::Less;
504                    }
505                    Ordering::Greater => {
506                        return Ordering::Greater;
507                    }
508                    Ordering::Equal => {}
509                }
510            }
511            key_b.len().cmp(&key_a.len())
512        });
513        for (_, exprs) in groups {
514            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
515            // Partition and sorting is done at physical level, see the EnforceDistribution
516            // and EnforceSorting rules.
517            plan = LogicalPlanBuilder::from(plan)
518                .window(window_exprs)?
519                .build()?;
520        }
521        Ok(plan)
522    }
523    /// Apply a projection without alias.
524    pub fn project(
525        self,
526        expr: impl IntoIterator<Item = impl Into<Expr>>,
527    ) -> Result<Self> {
528        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
529    }
530
531    /// Apply a projection without alias with optional validation
532    /// (true to validate, false to not validate)
533    pub fn project_with_validation(
534        self,
535        expr: Vec<(impl Into<Expr>, bool)>,
536    ) -> Result<Self> {
537        project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
538    }
539
540    /// Select the given column indices
541    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
542        let exprs: Vec<_> = indices
543            .into_iter()
544            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
545            .collect();
546        self.project(exprs)
547    }
548
549    /// Apply a filter
550    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
551        let expr = normalize_col(expr.into(), &self.plan)?;
552        Filter::try_new(expr, self.plan)
553            .map(LogicalPlan::Filter)
554            .map(Self::new)
555    }
556
557    /// Apply a filter which is used for a having clause
558    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
559        let expr = normalize_col(expr.into(), &self.plan)?;
560        Filter::try_new_with_having(expr, self.plan)
561            .map(LogicalPlan::Filter)
562            .map(Self::from)
563    }
564
565    /// Make a builder for a prepare logical plan from the builder's plan
566    pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
567        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
568            Prepare {
569                name,
570                data_types,
571                input: self.plan,
572            },
573        ))))
574    }
575
576    /// Limit the number of rows returned
577    ///
578    /// `skip` - Number of rows to skip before fetch any row.
579    ///
580    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
581    ///          if specified.
582    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
583        let skip_expr = if skip == 0 {
584            None
585        } else {
586            Some(lit(skip as i64))
587        };
588        let fetch_expr = fetch.map(|f| lit(f as i64));
589        self.limit_by_expr(skip_expr, fetch_expr)
590    }
591
592    /// Limit the number of rows returned
593    ///
594    /// Similar to `limit` but uses expressions for `skip` and `fetch`
595    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
596        Ok(Self::new(LogicalPlan::Limit(Limit {
597            skip: skip.map(Box::new),
598            fetch: fetch.map(Box::new),
599            input: self.plan,
600        })))
601    }
602
603    /// Apply an alias
604    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
605        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
606    }
607
608    /// Add missing sort columns to all downstream projection
609    ///
610    /// Thus, if you have a LogicalPlan that selects A and B and have
611    /// not requested a sort by C, this code will add C recursively to
612    /// all input projections.
613    ///
614    /// Adding a new column is not correct if there is a `Distinct`
615    /// node, which produces only distinct values of its
616    /// inputs. Adding a new column to its input will result in
617    /// potentially different results than with the original column.
618    ///
619    /// For example, if the input is like:
620    ///
621    /// Distinct(A, B)
622    ///
623    /// If the input looks like
624    ///
625    /// a | b | c
626    /// --+---+---
627    /// 1 | 2 | 3
628    /// 1 | 2 | 4
629    ///
630    /// Distinct (A, B) --> (1,2)
631    ///
632    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
633    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
634    ///
635    /// See <https://github.com/apache/datafusion/issues/5065> for more details
636    fn add_missing_columns(
637        curr_plan: LogicalPlan,
638        missing_cols: &IndexSet<Column>,
639        is_distinct: bool,
640    ) -> Result<LogicalPlan> {
641        match curr_plan {
642            LogicalPlan::Projection(Projection {
643                input,
644                mut expr,
645                schema: _,
646            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
647                let mut missing_exprs = missing_cols
648                    .iter()
649                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
650                    .collect::<Result<Vec<_>>>()?;
651
652                // Do not let duplicate columns to be added, some of the
653                // missing_cols may be already present but without the new
654                // projected alias.
655                missing_exprs.retain(|e| !expr.contains(e));
656                if is_distinct {
657                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
658                }
659                expr.extend(missing_exprs);
660                project(Arc::unwrap_or_clone(input), expr)
661            }
662            _ => {
663                let is_distinct =
664                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
665                let new_inputs = curr_plan
666                    .inputs()
667                    .into_iter()
668                    .map(|input_plan| {
669                        Self::add_missing_columns(
670                            (*input_plan).clone(),
671                            missing_cols,
672                            is_distinct,
673                        )
674                    })
675                    .collect::<Result<Vec<_>>>()?;
676                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
677            }
678        }
679    }
680
681    fn ambiguous_distinct_check(
682        missing_exprs: &[Expr],
683        missing_cols: &IndexSet<Column>,
684        projection_exprs: &[Expr],
685    ) -> Result<()> {
686        if missing_exprs.is_empty() {
687            return Ok(());
688        }
689
690        // if the missing columns are all only aliases for things in
691        // the existing select list, it is ok
692        //
693        // This handles the special case for
694        // SELECT col as <alias> ORDER BY <alias>
695        //
696        // As described in https://github.com/apache/datafusion/issues/5293
697        let all_aliases = missing_exprs.iter().all(|e| {
698            projection_exprs.iter().any(|proj_expr| {
699                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
700                    e == expr.as_ref()
701                } else {
702                    false
703                }
704            })
705        });
706        if all_aliases {
707            return Ok(());
708        }
709
710        let missing_col_names = missing_cols
711            .iter()
712            .map(|col| col.flat_name())
713            .collect::<String>();
714
715        plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
716    }
717
718    /// Apply a sort by provided expressions with default direction
719    pub fn sort_by(
720        self,
721        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
722    ) -> Result<Self> {
723        self.sort(
724            expr.into_iter()
725                .map(|e| e.into().sort(true, false))
726                .collect::<Vec<SortExpr>>(),
727        )
728    }
729
730    pub fn sort(
731        self,
732        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
733    ) -> Result<Self> {
734        self.sort_with_limit(sorts, None)
735    }
736
737    /// Apply a sort
738    pub fn sort_with_limit(
739        self,
740        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
741        fetch: Option<usize>,
742    ) -> Result<Self> {
743        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
744
745        let schema = self.plan.schema();
746
747        // Collect sort columns that are missing in the input plan's schema
748        let mut missing_cols: IndexSet<Column> = IndexSet::new();
749        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
750            let columns = sort.expr.column_refs();
751
752            missing_cols.extend(
753                columns
754                    .into_iter()
755                    .filter(|c| !schema.has_column(c))
756                    .cloned(),
757            );
758
759            Ok(())
760        })?;
761
762        if missing_cols.is_empty() {
763            return Ok(Self::new(LogicalPlan::Sort(Sort {
764                expr: normalize_sorts(sorts, &self.plan)?,
765                input: self.plan,
766                fetch,
767            })));
768        }
769
770        // remove pushed down sort columns
771        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
772
773        let is_distinct = false;
774        let plan = Self::add_missing_columns(
775            Arc::unwrap_or_clone(self.plan),
776            &missing_cols,
777            is_distinct,
778        )?;
779        let sort_plan = LogicalPlan::Sort(Sort {
780            expr: normalize_sorts(sorts, &plan)?,
781            input: Arc::new(plan),
782            fetch,
783        });
784
785        Projection::try_new(new_expr, Arc::new(sort_plan))
786            .map(LogicalPlan::Projection)
787            .map(Self::new)
788    }
789
790    /// Apply a union, preserving duplicate rows
791    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
792        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
793    }
794
795    /// Apply a union by name, preserving duplicate rows
796    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
797        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
798    }
799
800    /// Apply a union by name, removing duplicate rows
801    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
802        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
803        let right_plan: LogicalPlan = plan;
804
805        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
806            union_by_name(left_plan, right_plan)?,
807        )))))
808    }
809
810    /// Apply a union, removing duplicate rows
811    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
812        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
813        let right_plan: LogicalPlan = plan;
814
815        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
816            union(left_plan, right_plan)?,
817        )))))
818    }
819
820    /// Apply deduplication: Only distinct (different) values are returned)
821    pub fn distinct(self) -> Result<Self> {
822        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
823    }
824
825    /// Project first values of the specified expression list according to the provided
826    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
827    pub fn distinct_on(
828        self,
829        on_expr: Vec<Expr>,
830        select_expr: Vec<Expr>,
831        sort_expr: Option<Vec<SortExpr>>,
832    ) -> Result<Self> {
833        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
834            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
835        ))))
836    }
837
838    /// Apply a join to `right` using explicitly specified columns and an
839    /// optional filter expression.
840    ///
841    /// See [`join_on`](Self::join_on) for a more concise way to specify the
842    /// join condition. Since DataFusion will automatically identify and
843    /// optimize equality predicates there is no performance difference between
844    /// this function and `join_on`
845    ///
846    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
847    /// example below), which are then combined with the optional `filter`
848    /// expression.
849    ///
850    /// Note that in case of outer join, the `filter` is applied to only matched rows.
851    pub fn join(
852        self,
853        right: LogicalPlan,
854        join_type: JoinType,
855        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
856        filter: Option<Expr>,
857    ) -> Result<Self> {
858        self.join_detailed(right, join_type, join_keys, filter, false)
859    }
860
861    /// Apply a join using the specified expressions.
862    ///
863    /// Note that DataFusion automatically optimizes joins, including
864    /// identifying and optimizing equality predicates.
865    ///
866    /// # Example
867    ///
868    /// ```
869    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
870    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
871    /// # use std::sync::Arc;
872    /// # use arrow::datatypes::{Schema, DataType, Field};
873    /// # use datafusion_common::Result;
874    /// # fn main() -> Result<()> {
875    /// let example_schema = Arc::new(Schema::new(vec![
876    ///     Field::new("a", DataType::Int32, false),
877    ///     Field::new("b", DataType::Int32, false),
878    ///     Field::new("c", DataType::Int32, false),
879    /// ]));
880    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
881    /// let left_table = table_source.clone();
882    /// let right_table = table_source.clone();
883    ///
884    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
885    ///
886    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
887    /// let exprs = vec![
888    ///     col("left.a").eq(col("right.a")),
889    ///     col("left.b").not_eq(col("right.b"))
890    ///  ];
891    ///
892    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
893    /// // finding all pairs of rows from `left` and `right` where
894    /// // where `a = a2` and `b != b2`.
895    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
896    ///     .join_on(right_plan, JoinType::Inner, exprs)?
897    ///     .build()?;
898    /// # Ok(())
899    /// # }
900    /// ```
901    pub fn join_on(
902        self,
903        right: LogicalPlan,
904        join_type: JoinType,
905        on_exprs: impl IntoIterator<Item = Expr>,
906    ) -> Result<Self> {
907        let filter = on_exprs.into_iter().reduce(Expr::and);
908
909        self.join_detailed(
910            right,
911            join_type,
912            (Vec::<Column>::new(), Vec::<Column>::new()),
913            filter,
914            false,
915        )
916    }
917
918    pub(crate) fn normalize(
919        plan: &LogicalPlan,
920        column: impl Into<Column>,
921    ) -> Result<Column> {
922        let column = column.into();
923        if column.relation.is_some() {
924            // column is already normalized
925            return Ok(column);
926        }
927
928        let schema = plan.schema();
929        let fallback_schemas = plan.fallback_normalize_schemas();
930        let using_columns = plan.using_columns()?;
931        column.normalize_with_schemas_and_ambiguity_check(
932            &[&[schema], &fallback_schemas],
933            &using_columns,
934        )
935    }
936
937    /// Apply a join with on constraint and specified null equality.
938    ///
939    /// The behavior is the same as [`join`](Self::join) except that it allows
940    /// specifying the null equality behavior.
941    ///
942    /// If `null_equals_null=true`, rows where both join keys are `null` will be
943    /// emitted. Otherwise rows where either or both join keys are `null` will be
944    /// omitted.
945    pub fn join_detailed(
946        self,
947        right: LogicalPlan,
948        join_type: JoinType,
949        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
950        filter: Option<Expr>,
951        null_equals_null: bool,
952    ) -> Result<Self> {
953        if join_keys.0.len() != join_keys.1.len() {
954            return plan_err!("left_keys and right_keys were not the same length");
955        }
956
957        let filter = if let Some(expr) = filter {
958            let filter = normalize_col_with_schemas_and_ambiguity_check(
959                expr,
960                &[&[self.schema(), right.schema()]],
961                &[],
962            )?;
963            Some(filter)
964        } else {
965            None
966        };
967
968        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
969            join_keys
970                .0
971                .into_iter()
972                .zip(join_keys.1)
973                .map(|(l, r)| {
974                    let l = l.into();
975                    let r = r.into();
976
977                    match (&l.relation, &r.relation) {
978                        (Some(lr), Some(rr)) => {
979                            let l_is_left =
980                                self.plan.schema().field_with_qualified_name(lr, &l.name);
981                            let l_is_right =
982                                right.schema().field_with_qualified_name(lr, &l.name);
983                            let r_is_left =
984                                self.plan.schema().field_with_qualified_name(rr, &r.name);
985                            let r_is_right =
986                                right.schema().field_with_qualified_name(rr, &r.name);
987
988                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
989                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
990                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
991                                _ => (
992                                    Self::normalize(&self.plan, l),
993                                    Self::normalize(&right, r),
994                                ),
995                            }
996                        }
997                        (Some(lr), None) => {
998                            let l_is_left =
999                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1000                            let l_is_right =
1001                                right.schema().field_with_qualified_name(lr, &l.name);
1002
1003                            match (l_is_left, l_is_right) {
1004                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1005                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1006                                _ => (
1007                                    Self::normalize(&self.plan, l),
1008                                    Self::normalize(&right, r),
1009                                ),
1010                            }
1011                        }
1012                        (None, Some(rr)) => {
1013                            let r_is_left =
1014                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1015                            let r_is_right =
1016                                right.schema().field_with_qualified_name(rr, &r.name);
1017
1018                            match (r_is_left, r_is_right) {
1019                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1020                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1021                                _ => (
1022                                    Self::normalize(&self.plan, l),
1023                                    Self::normalize(&right, r),
1024                                ),
1025                            }
1026                        }
1027                        (None, None) => {
1028                            let mut swap = false;
1029                            let left_key = Self::normalize(&self.plan, l.clone())
1030                                .or_else(|_| {
1031                                    swap = true;
1032                                    Self::normalize(&right, l)
1033                                });
1034                            if swap {
1035                                (Self::normalize(&self.plan, r), left_key)
1036                            } else {
1037                                (left_key, Self::normalize(&right, r))
1038                            }
1039                        }
1040                    }
1041                })
1042                .unzip();
1043
1044        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1045        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1046
1047        let on = left_keys
1048            .into_iter()
1049            .zip(right_keys)
1050            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1051            .collect();
1052        let join_schema =
1053            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1054
1055        Ok(Self::new(LogicalPlan::Join(Join {
1056            left: self.plan,
1057            right: Arc::new(right),
1058            on,
1059            filter,
1060            join_type,
1061            join_constraint: JoinConstraint::On,
1062            schema: DFSchemaRef::new(join_schema),
1063            null_equals_null,
1064        })))
1065    }
1066
1067    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1068    pub fn join_using(
1069        self,
1070        right: LogicalPlan,
1071        join_type: JoinType,
1072        using_keys: Vec<impl Into<Column> + Clone>,
1073    ) -> Result<Self> {
1074        let left_keys: Vec<Column> = using_keys
1075            .clone()
1076            .into_iter()
1077            .map(|c| Self::normalize(&self.plan, c))
1078            .collect::<Result<_>>()?;
1079        let right_keys: Vec<Column> = using_keys
1080            .into_iter()
1081            .map(|c| Self::normalize(&right, c))
1082            .collect::<Result<_>>()?;
1083
1084        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1085        let join_schema =
1086            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1087        let mut join_on: Vec<(Expr, Expr)> = vec![];
1088        let mut filters: Option<Expr> = None;
1089        for (l, r) in &on {
1090            if self.plan.schema().has_column(l)
1091                && right.schema().has_column(r)
1092                && can_hash(self.plan.schema().field_from_column(l)?.data_type())
1093            {
1094                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1095            } else if self.plan.schema().has_column(l)
1096                && right.schema().has_column(r)
1097                && can_hash(self.plan.schema().field_from_column(r)?.data_type())
1098            {
1099                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1100            } else {
1101                let expr = binary_expr(
1102                    Expr::Column(l.clone()),
1103                    Operator::Eq,
1104                    Expr::Column(r.clone()),
1105                );
1106                match filters {
1107                    None => filters = Some(expr),
1108                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1109                }
1110            }
1111        }
1112
1113        if join_on.is_empty() {
1114            let join = Self::from(self.plan).cross_join(right)?;
1115            join.filter(filters.ok_or_else(|| {
1116                DataFusionError::Internal("filters should not be None here".to_string())
1117            })?)
1118        } else {
1119            Ok(Self::new(LogicalPlan::Join(Join {
1120                left: self.plan,
1121                right: Arc::new(right),
1122                on: join_on,
1123                filter: filters,
1124                join_type,
1125                join_constraint: JoinConstraint::Using,
1126                schema: DFSchemaRef::new(join_schema),
1127                null_equals_null: false,
1128            })))
1129        }
1130    }
1131
1132    /// Apply a cross join
1133    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1134        let join_schema =
1135            build_join_schema(self.plan.schema(), right.schema(), &JoinType::Inner)?;
1136        Ok(Self::new(LogicalPlan::Join(Join {
1137            left: self.plan,
1138            right: Arc::new(right),
1139            on: vec![],
1140            filter: None,
1141            join_type: JoinType::Inner,
1142            join_constraint: JoinConstraint::On,
1143            null_equals_null: false,
1144            schema: DFSchemaRef::new(join_schema),
1145        })))
1146    }
1147
1148    /// Repartition
1149    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1150        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1151            input: self.plan,
1152            partitioning_scheme,
1153        })))
1154    }
1155
1156    /// Apply a window functions to extend the schema
1157    pub fn window(
1158        self,
1159        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1160    ) -> Result<Self> {
1161        let window_expr = normalize_cols(window_expr, &self.plan)?;
1162        validate_unique_names("Windows", &window_expr)?;
1163        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1164            window_expr,
1165            self.plan,
1166        )?)))
1167    }
1168
1169    /// Apply an aggregate: grouping on the `group_expr` expressions
1170    /// and calculating `aggr_expr` aggregates for each distinct
1171    /// value of the `group_expr`;
1172    pub fn aggregate(
1173        self,
1174        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1175        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1176    ) -> Result<Self> {
1177        let group_expr = normalize_cols(group_expr, &self.plan)?;
1178        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1179
1180        let group_expr = if self.options.add_implicit_group_by_exprs {
1181            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1182        } else {
1183            group_expr
1184        };
1185
1186        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1187            .map(LogicalPlan::Aggregate)
1188            .map(Self::new)
1189    }
1190
1191    /// Create an expression to represent the explanation of the plan
1192    ///
1193    /// if `analyze` is true, runs the actual plan and produces
1194    /// information about metrics during run.
1195    ///
1196    /// if `verbose` is true, prints out additional details.
1197    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1198        let schema = LogicalPlan::explain_schema();
1199        let schema = schema.to_dfschema_ref()?;
1200
1201        if analyze {
1202            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1203                verbose,
1204                input: self.plan,
1205                schema,
1206            })))
1207        } else {
1208            let stringified_plans =
1209                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1210
1211            Ok(Self::new(LogicalPlan::Explain(Explain {
1212                verbose,
1213                plan: self.plan,
1214                stringified_plans,
1215                schema,
1216                logical_optimization_succeeded: false,
1217            })))
1218        }
1219    }
1220
1221    /// Process intersect set operator
1222    pub fn intersect(
1223        left_plan: LogicalPlan,
1224        right_plan: LogicalPlan,
1225        is_all: bool,
1226    ) -> Result<LogicalPlan> {
1227        LogicalPlanBuilder::intersect_or_except(
1228            left_plan,
1229            right_plan,
1230            JoinType::LeftSemi,
1231            is_all,
1232        )
1233    }
1234
1235    /// Process except set operator
1236    pub fn except(
1237        left_plan: LogicalPlan,
1238        right_plan: LogicalPlan,
1239        is_all: bool,
1240    ) -> Result<LogicalPlan> {
1241        LogicalPlanBuilder::intersect_or_except(
1242            left_plan,
1243            right_plan,
1244            JoinType::LeftAnti,
1245            is_all,
1246        )
1247    }
1248
1249    /// Process intersect or except
1250    fn intersect_or_except(
1251        left_plan: LogicalPlan,
1252        right_plan: LogicalPlan,
1253        join_type: JoinType,
1254        is_all: bool,
1255    ) -> Result<LogicalPlan> {
1256        let left_len = left_plan.schema().fields().len();
1257        let right_len = right_plan.schema().fields().len();
1258
1259        if left_len != right_len {
1260            return plan_err!(
1261                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1262            );
1263        }
1264
1265        let join_keys = left_plan
1266            .schema()
1267            .fields()
1268            .iter()
1269            .zip(right_plan.schema().fields().iter())
1270            .map(|(left_field, right_field)| {
1271                (
1272                    (Column::from_name(left_field.name())),
1273                    (Column::from_name(right_field.name())),
1274                )
1275            })
1276            .unzip();
1277        if is_all {
1278            LogicalPlanBuilder::from(left_plan)
1279                .join_detailed(right_plan, join_type, join_keys, None, true)?
1280                .build()
1281        } else {
1282            LogicalPlanBuilder::from(left_plan)
1283                .distinct()?
1284                .join_detailed(right_plan, join_type, join_keys, None, true)?
1285                .build()
1286        }
1287    }
1288
1289    /// Build the plan
1290    pub fn build(self) -> Result<LogicalPlan> {
1291        Ok(Arc::unwrap_or_clone(self.plan))
1292    }
1293
1294    /// Apply a join with both explicit equijoin and non equijoin predicates.
1295    ///
1296    /// Note this is a low level API that requires identifying specific
1297    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1298    /// automatically identifies predicates appropriately.
1299    ///
1300    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1301    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1302    /// to columns from the existing input. `r`, the second element of the tuple,
1303    /// must only refer to columns from the right input.
1304    ///
1305    /// `filter` contains any other other filter expression to apply during the
1306    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1307    /// than the filter expressions, so they are preferred.
1308    pub fn join_with_expr_keys(
1309        self,
1310        right: LogicalPlan,
1311        join_type: JoinType,
1312        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1313        filter: Option<Expr>,
1314    ) -> Result<Self> {
1315        if equi_exprs.0.len() != equi_exprs.1.len() {
1316            return plan_err!("left_keys and right_keys were not the same length");
1317        }
1318
1319        let join_key_pairs = equi_exprs
1320            .0
1321            .into_iter()
1322            .zip(equi_exprs.1)
1323            .map(|(l, r)| {
1324                let left_key = l.into();
1325                let right_key = r.into();
1326                let mut left_using_columns  = HashSet::new();
1327                expr_to_columns(&left_key, &mut left_using_columns)?;
1328                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1329                    left_key,
1330                    &[&[self.plan.schema()]],
1331                    &[],
1332                )?;
1333
1334                let mut right_using_columns = HashSet::new();
1335                expr_to_columns(&right_key, &mut right_using_columns)?;
1336                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1337                    right_key,
1338                    &[&[right.schema()]],
1339                    &[],
1340                )?;
1341
1342                // find valid equijoin
1343                find_valid_equijoin_key_pair(
1344                        &normalized_left_key,
1345                        &normalized_right_key,
1346                        self.plan.schema(),
1347                        right.schema(),
1348                    )?.ok_or_else(||
1349                        plan_datafusion_err!(
1350                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1351                        ))
1352            })
1353            .collect::<Result<Vec<_>>>()?;
1354
1355        let join_schema =
1356            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1357
1358        Ok(Self::new(LogicalPlan::Join(Join {
1359            left: self.plan,
1360            right: Arc::new(right),
1361            on: join_key_pairs,
1362            filter,
1363            join_type,
1364            join_constraint: JoinConstraint::On,
1365            schema: DFSchemaRef::new(join_schema),
1366            null_equals_null: false,
1367        })))
1368    }
1369
1370    /// Unnest the given column.
1371    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1372        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1373    }
1374
1375    /// Unnest the given column given [`UnnestOptions`]
1376    pub fn unnest_column_with_options(
1377        self,
1378        column: impl Into<Column>,
1379        options: UnnestOptions,
1380    ) -> Result<Self> {
1381        unnest_with_options(
1382            Arc::unwrap_or_clone(self.plan),
1383            vec![column.into()],
1384            options,
1385        )
1386        .map(Self::new)
1387    }
1388
1389    /// Unnest the given columns with the given [`UnnestOptions`]
1390    pub fn unnest_columns_with_options(
1391        self,
1392        columns: Vec<Column>,
1393        options: UnnestOptions,
1394    ) -> Result<Self> {
1395        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1396            .map(Self::new)
1397    }
1398}
1399
1400impl From<LogicalPlan> for LogicalPlanBuilder {
1401    fn from(plan: LogicalPlan) -> Self {
1402        LogicalPlanBuilder::new(plan)
1403    }
1404}
1405
1406impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1407    fn from(plan: Arc<LogicalPlan>) -> Self {
1408        LogicalPlanBuilder::new_from_arc(plan)
1409    }
1410}
1411
1412/// Container used when building fields for a `VALUES` node.
1413#[derive(Default)]
1414struct ValuesFields {
1415    inner: Vec<Field>,
1416}
1417
1418impl ValuesFields {
1419    pub fn new() -> Self {
1420        Self::default()
1421    }
1422
1423    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1424        // Naming follows the convention described here:
1425        // https://www.postgresql.org/docs/current/queries-values.html
1426        let name = format!("column{}", self.inner.len() + 1);
1427        self.inner.push(Field::new(name, data_type, nullable));
1428    }
1429
1430    pub fn into_fields(self) -> Fields {
1431        self.inner.into()
1432    }
1433}
1434
1435pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1436    let mut name_map = HashMap::new();
1437    fields
1438        .into_iter()
1439        .map(|field| {
1440            let counter = name_map.entry(field.name().to_string()).or_insert(0);
1441            *counter += 1;
1442            if *counter > 1 {
1443                let new_name = format!("{}:{}", field.name(), *counter - 1);
1444                Field::new(new_name, field.data_type().clone(), field.is_nullable())
1445            } else {
1446                field.as_ref().clone()
1447            }
1448        })
1449        .collect()
1450}
1451
1452fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1453    let mut table_references = schema
1454        .iter()
1455        .filter_map(|(qualifier, _)| qualifier)
1456        .collect::<Vec<_>>();
1457    table_references.dedup();
1458    let table_reference = if table_references.len() == 1 {
1459        table_references.pop().cloned()
1460    } else {
1461        None
1462    };
1463
1464    (
1465        table_reference,
1466        Arc::new(Field::new("mark", DataType::Boolean, false)),
1467    )
1468}
1469
1470/// Creates a schema for a join operation.
1471/// The fields from the left side are first
1472pub fn build_join_schema(
1473    left: &DFSchema,
1474    right: &DFSchema,
1475    join_type: &JoinType,
1476) -> Result<DFSchema> {
1477    fn nullify_fields<'a>(
1478        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1479    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1480        fields
1481            .map(|(q, f)| {
1482                // TODO: find a good way to do that
1483                let field = f.as_ref().clone().with_nullable(true);
1484                (q.cloned(), Arc::new(field))
1485            })
1486            .collect()
1487    }
1488
1489    let right_fields = right.iter();
1490    let left_fields = left.iter();
1491
1492    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1493        JoinType::Inner => {
1494            // left then right
1495            let left_fields = left_fields
1496                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1497                .collect::<Vec<_>>();
1498            let right_fields = right_fields
1499                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1500                .collect::<Vec<_>>();
1501            left_fields.into_iter().chain(right_fields).collect()
1502        }
1503        JoinType::Left => {
1504            // left then right, right set to nullable in case of not matched scenario
1505            let left_fields = left_fields
1506                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1507                .collect::<Vec<_>>();
1508            left_fields
1509                .into_iter()
1510                .chain(nullify_fields(right_fields))
1511                .collect()
1512        }
1513        JoinType::Right => {
1514            // left then right, left set to nullable in case of not matched scenario
1515            let right_fields = right_fields
1516                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1517                .collect::<Vec<_>>();
1518            nullify_fields(left_fields)
1519                .into_iter()
1520                .chain(right_fields)
1521                .collect()
1522        }
1523        JoinType::Full => {
1524            // left then right, all set to nullable in case of not matched scenario
1525            nullify_fields(left_fields)
1526                .into_iter()
1527                .chain(nullify_fields(right_fields))
1528                .collect()
1529        }
1530        JoinType::LeftSemi | JoinType::LeftAnti => {
1531            // Only use the left side for the schema
1532            left_fields
1533                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1534                .collect()
1535        }
1536        JoinType::LeftMark => left_fields
1537            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1538            .chain(once(mark_field(right)))
1539            .collect(),
1540        JoinType::RightSemi | JoinType::RightAnti => {
1541            // Only use the right side for the schema
1542            right_fields
1543                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1544                .collect()
1545        }
1546    };
1547    let func_dependencies = left.functional_dependencies().join(
1548        right.functional_dependencies(),
1549        join_type,
1550        left.fields().len(),
1551    );
1552    let metadata = left
1553        .metadata()
1554        .clone()
1555        .into_iter()
1556        .chain(right.metadata().clone())
1557        .collect();
1558    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1559    dfschema.with_functional_dependencies(func_dependencies)
1560}
1561
1562/// Add additional "synthetic" group by expressions based on functional
1563/// dependencies.
1564///
1565/// For example, if we are grouping on `[c1]`, and we know from
1566/// functional dependencies that column `c1` determines `c2`, this function
1567/// adds `c2` to the group by list.
1568///
1569/// This allows MySQL style selects like
1570/// `SELECT col FROM t WHERE pk = 5` if col is unique
1571pub fn add_group_by_exprs_from_dependencies(
1572    mut group_expr: Vec<Expr>,
1573    schema: &DFSchemaRef,
1574) -> Result<Vec<Expr>> {
1575    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1576    // c1 + 1` produces an output field named `"c1 + 1"`
1577    let mut group_by_field_names = group_expr
1578        .iter()
1579        .map(|e| e.schema_name().to_string())
1580        .collect::<Vec<_>>();
1581
1582    if let Some(target_indices) =
1583        get_target_functional_dependencies(schema, &group_by_field_names)
1584    {
1585        for idx in target_indices {
1586            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1587            let expr_name = expr.schema_name().to_string();
1588            if !group_by_field_names.contains(&expr_name) {
1589                group_by_field_names.push(expr_name);
1590                group_expr.push(expr);
1591            }
1592        }
1593    }
1594    Ok(group_expr)
1595}
1596
1597/// Errors if one or more expressions have equal names.
1598pub fn validate_unique_names<'a>(
1599    node_name: &str,
1600    expressions: impl IntoIterator<Item = &'a Expr>,
1601) -> Result<()> {
1602    let mut unique_names = HashMap::new();
1603
1604    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1605        let name = expr.schema_name().to_string();
1606        match unique_names.get(&name) {
1607            None => {
1608                unique_names.insert(name, (position, expr));
1609                Ok(())
1610            },
1611            Some((existing_position, existing_expr)) => {
1612                plan_err!("{node_name} require unique expression names \
1613                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1614                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1615                            )
1616            }
1617        }
1618    })
1619}
1620
1621/// Union two [`LogicalPlan`]s.
1622///
1623/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1624/// subtree expressions will not be properly typed until the optimizer pass.
1625///
1626/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1627/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1628/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1629///
1630/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1631/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1632pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1633    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1634        Arc::new(left_plan),
1635        Arc::new(right_plan),
1636    ])?))
1637}
1638
1639/// Like [`union`], but combine rows from different tables by name, rather than
1640/// by position.
1641pub fn union_by_name(
1642    left_plan: LogicalPlan,
1643    right_plan: LogicalPlan,
1644) -> Result<LogicalPlan> {
1645    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1646        Arc::new(left_plan),
1647        Arc::new(right_plan),
1648    ])?))
1649}
1650
1651/// Create Projection
1652/// # Errors
1653/// This function errors under any of the following conditions:
1654/// * Two or more expressions have the same name
1655/// * An invalid expression is used (e.g. a `sort` expression)
1656pub fn project(
1657    plan: LogicalPlan,
1658    expr: impl IntoIterator<Item = impl Into<Expr>>,
1659) -> Result<LogicalPlan> {
1660    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1661}
1662
1663/// Create Projection. Similar to project except that the expressions
1664/// passed in have a flag to indicate if that expression requires
1665/// validation (normalize & columnize) (true) or not (false)
1666/// # Errors
1667/// This function errors under any of the following conditions:
1668/// * Two or more expressions have the same name
1669/// * An invalid expression is used (e.g. a `sort` expression)
1670fn project_with_validation(
1671    plan: LogicalPlan,
1672    expr: impl IntoIterator<Item = (impl Into<Expr>, bool)>,
1673) -> Result<LogicalPlan> {
1674    let mut projected_expr = vec![];
1675    for (e, validate) in expr {
1676        let e = e.into();
1677        match e {
1678            #[expect(deprecated)]
1679            Expr::Wildcard { .. } => projected_expr.push(e),
1680            _ => {
1681                if validate {
1682                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1683                } else {
1684                    projected_expr.push(e)
1685                }
1686            }
1687        }
1688    }
1689    validate_unique_names("Projections", projected_expr.iter())?;
1690
1691    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1692}
1693
1694/// Create a SubqueryAlias to wrap a LogicalPlan.
1695pub fn subquery_alias(
1696    plan: LogicalPlan,
1697    alias: impl Into<TableReference>,
1698) -> Result<LogicalPlan> {
1699    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1700}
1701
1702/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
1703/// This is mostly used for testing and documentation.
1704pub fn table_scan(
1705    name: Option<impl Into<TableReference>>,
1706    table_schema: &Schema,
1707    projection: Option<Vec<usize>>,
1708) -> Result<LogicalPlanBuilder> {
1709    table_scan_with_filters(name, table_schema, projection, vec![])
1710}
1711
1712/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1713/// and inlined filters.
1714/// This is mostly used for testing and documentation.
1715pub fn table_scan_with_filters(
1716    name: Option<impl Into<TableReference>>,
1717    table_schema: &Schema,
1718    projection: Option<Vec<usize>>,
1719    filters: Vec<Expr>,
1720) -> Result<LogicalPlanBuilder> {
1721    let table_source = table_source(table_schema);
1722    let name = name
1723        .map(|n| n.into())
1724        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1725    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1726}
1727
1728/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1729/// filters, and inlined fetch.
1730/// This is mostly used for testing and documentation.
1731pub fn table_scan_with_filter_and_fetch(
1732    name: Option<impl Into<TableReference>>,
1733    table_schema: &Schema,
1734    projection: Option<Vec<usize>>,
1735    filters: Vec<Expr>,
1736    fetch: Option<usize>,
1737) -> Result<LogicalPlanBuilder> {
1738    let table_source = table_source(table_schema);
1739    let name = name
1740        .map(|n| n.into())
1741        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1742    LogicalPlanBuilder::scan_with_filters_fetch(
1743        name,
1744        table_source,
1745        projection,
1746        filters,
1747        fetch,
1748    )
1749}
1750
1751pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
1752    let table_schema = Arc::new(table_schema.clone());
1753    Arc::new(LogicalTableSource {
1754        table_schema,
1755        constraints: Default::default(),
1756    })
1757}
1758
1759pub fn table_source_with_constraints(
1760    table_schema: &Schema,
1761    constraints: Constraints,
1762) -> Arc<dyn TableSource> {
1763    let table_schema = Arc::new(table_schema.clone());
1764    Arc::new(LogicalTableSource {
1765        table_schema,
1766        constraints,
1767    })
1768}
1769
1770/// Wrap projection for a plan, if the join keys contains normal expression.
1771pub fn wrap_projection_for_join_if_necessary(
1772    join_keys: &[Expr],
1773    input: LogicalPlan,
1774) -> Result<(LogicalPlan, Vec<Column>, bool)> {
1775    let input_schema = input.schema();
1776    let alias_join_keys: Vec<Expr> = join_keys
1777        .iter()
1778        .map(|key| {
1779            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
1780            // If we do not add alias, it will throw same field name error in the schema when adding projection.
1781            // For example:
1782            //    input scan : [a, b, c],
1783            //    join keys: [cast(a as int)]
1784            //
1785            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
1786            //  https://github.com/apache/datafusion/issues/4478
1787            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
1788                let alias = format!("{key}");
1789                key.clone().alias(alias)
1790            } else {
1791                key.clone()
1792            }
1793        })
1794        .collect::<Vec<_>>();
1795
1796    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
1797    let plan = if need_project {
1798        // Include all columns from the input and extend them with the join keys
1799        let mut projection = input_schema
1800            .columns()
1801            .into_iter()
1802            .map(Expr::Column)
1803            .collect::<Vec<_>>();
1804        let join_key_items = alias_join_keys
1805            .iter()
1806            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
1807            .cloned()
1808            .collect::<HashSet<Expr>>();
1809        projection.extend(join_key_items);
1810
1811        LogicalPlanBuilder::from(input)
1812            .project(projection)?
1813            .build()?
1814    } else {
1815        input
1816    };
1817
1818    let join_on = alias_join_keys
1819        .into_iter()
1820        .map(|key| {
1821            if let Some(col) = key.try_as_col() {
1822                Ok(col.clone())
1823            } else {
1824                let name = key.schema_name().to_string();
1825                Ok(Column::from_name(name))
1826            }
1827        })
1828        .collect::<Result<Vec<_>>>()?;
1829
1830    Ok((plan, join_on, need_project))
1831}
1832
1833/// Basic TableSource implementation intended for use in tests and documentation. It is expected
1834/// that users will provide their own TableSource implementations or use DataFusion's
1835/// DefaultTableSource.
1836pub struct LogicalTableSource {
1837    table_schema: SchemaRef,
1838    constraints: Constraints,
1839}
1840
1841impl LogicalTableSource {
1842    /// Create a new LogicalTableSource
1843    pub fn new(table_schema: SchemaRef) -> Self {
1844        Self {
1845            table_schema,
1846            constraints: Constraints::default(),
1847        }
1848    }
1849
1850    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1851        self.constraints = constraints;
1852        self
1853    }
1854}
1855
1856impl TableSource for LogicalTableSource {
1857    fn as_any(&self) -> &dyn Any {
1858        self
1859    }
1860
1861    fn schema(&self) -> SchemaRef {
1862        Arc::clone(&self.table_schema)
1863    }
1864
1865    fn constraints(&self) -> Option<&Constraints> {
1866        Some(&self.constraints)
1867    }
1868
1869    fn supports_filters_pushdown(
1870        &self,
1871        filters: &[&Expr],
1872    ) -> Result<Vec<TableProviderFilterPushDown>> {
1873        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
1874    }
1875}
1876
1877/// Create a [`LogicalPlan::Unnest`] plan
1878pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
1879    unnest_with_options(input, columns, UnnestOptions::default())
1880}
1881
1882// Get the data type of a multi-dimensional type after unnesting it
1883// with a given depth
1884fn get_unnested_list_datatype_recursive(
1885    data_type: &DataType,
1886    depth: usize,
1887) -> Result<DataType> {
1888    match data_type {
1889        DataType::List(field)
1890        | DataType::FixedSizeList(field, _)
1891        | DataType::LargeList(field) => {
1892            if depth == 1 {
1893                return Ok(field.data_type().clone());
1894            }
1895            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
1896        }
1897        _ => {}
1898    };
1899
1900    internal_err!("trying to unnest on invalid data type {:?}", data_type)
1901}
1902
1903pub fn get_struct_unnested_columns(
1904    col_name: &String,
1905    inner_fields: &Fields,
1906) -> Vec<Column> {
1907    inner_fields
1908        .iter()
1909        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
1910        .collect()
1911}
1912
1913// Based on data type, either struct or a variant of list
1914// return a set of columns as the result of unnesting
1915// the input columns.
1916// For example, given a column with name "a",
1917// - List(Element) returns ["a"] with data type Element
1918// - Struct(field1, field2) returns ["a.field1","a.field2"]
1919// For list data type, an argument depth is used to specify
1920// the recursion level
1921pub fn get_unnested_columns(
1922    col_name: &String,
1923    data_type: &DataType,
1924    depth: usize,
1925) -> Result<Vec<(Column, Arc<Field>)>> {
1926    let mut qualified_columns = Vec::with_capacity(1);
1927
1928    match data_type {
1929        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
1930            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
1931            let new_field = Arc::new(Field::new(
1932                col_name, data_type,
1933                // Unnesting may produce NULLs even if the list is not null.
1934                // For example: unnest([1], []) -> 1, null
1935                true,
1936            ));
1937            let column = Column::from_name(col_name);
1938            // let column = Column::from((None, &new_field));
1939            qualified_columns.push((column, new_field));
1940        }
1941        DataType::Struct(fields) => {
1942            qualified_columns.extend(fields.iter().map(|f| {
1943                let new_name = format!("{}.{}", col_name, f.name());
1944                let column = Column::from_name(&new_name);
1945                let new_field = f.as_ref().clone().with_name(new_name);
1946                // let column = Column::from((None, &f));
1947                (column, Arc::new(new_field))
1948            }))
1949        }
1950        _ => {
1951            return internal_err!(
1952                "trying to unnest on invalid data type {:?}",
1953                data_type
1954            );
1955        }
1956    };
1957    Ok(qualified_columns)
1958}
1959
1960/// Create a [`LogicalPlan::Unnest`] plan with options
1961/// This function receive a list of columns to be unnested
1962/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
1963/// The new schema will contains post-unnest fields replacing the original field
1964///
1965/// For example:
1966/// Input schema as
1967/// ```text
1968/// +---------------------+-------------------+
1969/// | col1                | col2              |
1970/// +---------------------+-------------------+
1971/// | Struct(INT64,INT32) | List(List(Int64)) |
1972/// +---------------------+-------------------+
1973/// ```
1974///
1975///
1976///
1977/// Then unnesting columns with:
1978/// - (col1,Struct)
1979/// - (col2,List(\[depth=1,depth=2\]))
1980///
1981/// will generate a new schema as
1982/// ```text
1983/// +---------+---------+---------------------+---------------------+
1984/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
1985/// +---------+---------+---------------------+---------------------+
1986/// | Int64   | Int32   | List(Int64)         |  Int64              |
1987/// +---------+---------+---------------------+---------------------+
1988/// ```
1989pub fn unnest_with_options(
1990    input: LogicalPlan,
1991    columns_to_unnest: Vec<Column>,
1992    options: UnnestOptions,
1993) -> Result<LogicalPlan> {
1994    let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
1995    let mut struct_columns = vec![];
1996    let indices_to_unnest = columns_to_unnest
1997        .iter()
1998        .map(|c| Ok((input.schema().index_of_column(c)?, c)))
1999        .collect::<Result<HashMap<usize, &Column>>>()?;
2000
2001    let input_schema = input.schema();
2002
2003    let mut dependency_indices = vec![];
2004    // Transform input schema into new schema
2005    // Given this comprehensive example
2006    //
2007    // input schema:
2008    // 1.col1_unnest_placeholder: list[list[int]],
2009    // 2.col1: list[list[int]]
2010    // 3.col2: list[int]
2011    // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
2012    // output schema:
2013    // 1.unnest_col1_depth_2: int
2014    // 2.unnest_col1_depth_1: list[int]
2015    // 3.col1: list[list[int]]
2016    // 4.unnest_col2_depth_1: int
2017    // Meaning the placeholder column will be replaced by its unnested variation(s), note
2018    // the plural.
2019    let fields = input_schema
2020        .iter()
2021        .enumerate()
2022        .map(|(index, (original_qualifier, original_field))| {
2023            match indices_to_unnest.get(&index) {
2024                Some(column_to_unnest) => {
2025                    let recursions_on_column = options
2026                        .recursions
2027                        .iter()
2028                        .filter(|p| -> bool { &p.input_column == *column_to_unnest })
2029                        .collect::<Vec<_>>();
2030                    let mut transformed_columns = recursions_on_column
2031                        .iter()
2032                        .map(|r| {
2033                            list_columns.push((
2034                                index,
2035                                ColumnUnnestList {
2036                                    output_column: r.output_column.clone(),
2037                                    depth: r.depth,
2038                                },
2039                            ));
2040                            Ok(get_unnested_columns(
2041                                &r.output_column.name,
2042                                original_field.data_type(),
2043                                r.depth,
2044                            )?
2045                            .into_iter()
2046                            .next()
2047                            .unwrap()) // because unnesting a list column always result into one result
2048                        })
2049                        .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
2050                    if transformed_columns.is_empty() {
2051                        transformed_columns = get_unnested_columns(
2052                            &column_to_unnest.name,
2053                            original_field.data_type(),
2054                            1,
2055                        )?;
2056                        match original_field.data_type() {
2057                            DataType::Struct(_) => {
2058                                struct_columns.push(index);
2059                            }
2060                            DataType::List(_)
2061                            | DataType::FixedSizeList(_, _)
2062                            | DataType::LargeList(_) => {
2063                                list_columns.push((
2064                                    index,
2065                                    ColumnUnnestList {
2066                                        output_column: Column::from_name(
2067                                            &column_to_unnest.name,
2068                                        ),
2069                                        depth: 1,
2070                                    },
2071                                ));
2072                            }
2073                            _ => {}
2074                        };
2075                    }
2076
2077                    // new columns dependent on the same original index
2078                    dependency_indices
2079                        .extend(std::iter::repeat(index).take(transformed_columns.len()));
2080                    Ok(transformed_columns
2081                        .iter()
2082                        .map(|(col, field)| (col.relation.to_owned(), field.to_owned()))
2083                        .collect())
2084                }
2085                None => {
2086                    dependency_indices.push(index);
2087                    Ok(vec![(
2088                        original_qualifier.cloned(),
2089                        Arc::clone(original_field),
2090                    )])
2091                }
2092            }
2093        })
2094        .collect::<Result<Vec<_>>>()?
2095        .into_iter()
2096        .flatten()
2097        .collect::<Vec<_>>();
2098
2099    let metadata = input_schema.metadata().clone();
2100    let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
2101    // We can use the existing functional dependencies:
2102    let deps = input_schema.functional_dependencies().clone();
2103    let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
2104
2105    Ok(LogicalPlan::Unnest(Unnest {
2106        input: Arc::new(input),
2107        exec_columns: columns_to_unnest,
2108        list_type_columns: list_columns,
2109        struct_type_columns: struct_columns,
2110        dependency_indices,
2111        schema,
2112        options,
2113    }))
2114}
2115
2116#[cfg(test)]
2117mod tests {
2118    use super::*;
2119    use crate::logical_plan::StringifiedPlan;
2120    use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2121
2122    use crate::test::function_stub::sum;
2123    use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError};
2124
2125    #[test]
2126    fn plan_builder_simple() -> Result<()> {
2127        let plan =
2128            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2129                .filter(col("state").eq(lit("CO")))?
2130                .project(vec![col("id")])?
2131                .build()?;
2132
2133        let expected = "Projection: employee_csv.id\
2134        \n  Filter: employee_csv.state = Utf8(\"CO\")\
2135        \n    TableScan: employee_csv projection=[id, state]";
2136
2137        assert_eq!(expected, format!("{plan}"));
2138
2139        Ok(())
2140    }
2141
2142    #[test]
2143    fn plan_builder_schema() {
2144        let schema = employee_schema();
2145        let projection = None;
2146        let plan =
2147            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2148                .unwrap();
2149        let expected = DFSchema::try_from_qualified_schema(
2150            TableReference::bare("employee_csv"),
2151            &schema,
2152        )
2153        .unwrap();
2154        assert_eq!(&expected, plan.schema().as_ref());
2155
2156        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2157        // (and thus normalized to "employee"csv") as well
2158        let projection = None;
2159        let plan =
2160            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2161                .unwrap();
2162        assert_eq!(&expected, plan.schema().as_ref());
2163    }
2164
2165    #[test]
2166    fn plan_builder_empty_name() {
2167        let schema = employee_schema();
2168        let projection = None;
2169        let err =
2170            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2171        assert_eq!(
2172            err.strip_backtrace(),
2173            "Error during planning: table_name cannot be empty"
2174        );
2175    }
2176
2177    #[test]
2178    fn plan_builder_sort() -> Result<()> {
2179        let plan =
2180            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2181                .sort(vec![
2182                    expr::Sort::new(col("state"), true, true),
2183                    expr::Sort::new(col("salary"), false, false),
2184                ])?
2185                .build()?;
2186
2187        let expected = "Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST\
2188        \n  TableScan: employee_csv projection=[state, salary]";
2189
2190        assert_eq!(expected, format!("{plan}"));
2191
2192        Ok(())
2193    }
2194
2195    #[test]
2196    fn plan_builder_union() -> Result<()> {
2197        let plan =
2198            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2199
2200        let plan = plan
2201            .clone()
2202            .union(plan.clone().build()?)?
2203            .union(plan.clone().build()?)?
2204            .union(plan.build()?)?
2205            .build()?;
2206
2207        let expected = "Union\
2208        \n  Union\
2209        \n    Union\
2210        \n      TableScan: employee_csv projection=[state, salary]\
2211        \n      TableScan: employee_csv projection=[state, salary]\
2212        \n    TableScan: employee_csv projection=[state, salary]\
2213        \n  TableScan: employee_csv projection=[state, salary]";
2214
2215        assert_eq!(expected, format!("{plan}"));
2216
2217        Ok(())
2218    }
2219
2220    #[test]
2221    fn plan_builder_union_distinct() -> Result<()> {
2222        let plan =
2223            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2224
2225        let plan = plan
2226            .clone()
2227            .union_distinct(plan.clone().build()?)?
2228            .union_distinct(plan.clone().build()?)?
2229            .union_distinct(plan.build()?)?
2230            .build()?;
2231
2232        let expected = "\
2233        Distinct:\
2234        \n  Union\
2235        \n    Distinct:\
2236        \n      Union\
2237        \n        Distinct:\
2238        \n          Union\
2239        \n            TableScan: employee_csv projection=[state, salary]\
2240        \n            TableScan: employee_csv projection=[state, salary]\
2241        \n        TableScan: employee_csv projection=[state, salary]\
2242        \n    TableScan: employee_csv projection=[state, salary]";
2243
2244        assert_eq!(expected, format!("{plan}"));
2245
2246        Ok(())
2247    }
2248
2249    #[test]
2250    fn plan_builder_simple_distinct() -> Result<()> {
2251        let plan =
2252            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2253                .filter(col("state").eq(lit("CO")))?
2254                .project(vec![col("id")])?
2255                .distinct()?
2256                .build()?;
2257
2258        let expected = "\
2259        Distinct:\
2260        \n  Projection: employee_csv.id\
2261        \n    Filter: employee_csv.state = Utf8(\"CO\")\
2262        \n      TableScan: employee_csv projection=[id, state]";
2263
2264        assert_eq!(expected, format!("{plan}"));
2265
2266        Ok(())
2267    }
2268
2269    #[test]
2270    fn exists_subquery() -> Result<()> {
2271        let foo = test_table_scan_with_name("foo")?;
2272        let bar = test_table_scan_with_name("bar")?;
2273
2274        let subquery = LogicalPlanBuilder::from(foo)
2275            .project(vec![col("a")])?
2276            .filter(col("a").eq(col("bar.a")))?
2277            .build()?;
2278
2279        let outer_query = LogicalPlanBuilder::from(bar)
2280            .project(vec![col("a")])?
2281            .filter(exists(Arc::new(subquery)))?
2282            .build()?;
2283
2284        let expected = "Filter: EXISTS (<subquery>)\
2285        \n  Subquery:\
2286        \n    Filter: foo.a = bar.a\
2287        \n      Projection: foo.a\
2288        \n        TableScan: foo\
2289        \n  Projection: bar.a\
2290        \n    TableScan: bar";
2291        assert_eq!(expected, format!("{outer_query}"));
2292
2293        Ok(())
2294    }
2295
2296    #[test]
2297    fn filter_in_subquery() -> Result<()> {
2298        let foo = test_table_scan_with_name("foo")?;
2299        let bar = test_table_scan_with_name("bar")?;
2300
2301        let subquery = LogicalPlanBuilder::from(foo)
2302            .project(vec![col("a")])?
2303            .filter(col("a").eq(col("bar.a")))?
2304            .build()?;
2305
2306        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2307        let outer_query = LogicalPlanBuilder::from(bar)
2308            .project(vec![col("a")])?
2309            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2310            .build()?;
2311
2312        let expected = "Filter: bar.a IN (<subquery>)\
2313        \n  Subquery:\
2314        \n    Filter: foo.a = bar.a\
2315        \n      Projection: foo.a\
2316        \n        TableScan: foo\
2317        \n  Projection: bar.a\
2318        \n    TableScan: bar";
2319        assert_eq!(expected, format!("{outer_query}"));
2320
2321        Ok(())
2322    }
2323
2324    #[test]
2325    fn select_scalar_subquery() -> Result<()> {
2326        let foo = test_table_scan_with_name("foo")?;
2327        let bar = test_table_scan_with_name("bar")?;
2328
2329        let subquery = LogicalPlanBuilder::from(foo)
2330            .project(vec![col("b")])?
2331            .filter(col("a").eq(col("bar.a")))?
2332            .build()?;
2333
2334        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2335        let outer_query = LogicalPlanBuilder::from(bar)
2336            .project(vec![scalar_subquery(Arc::new(subquery))])?
2337            .build()?;
2338
2339        let expected = "Projection: (<subquery>)\
2340        \n  Subquery:\
2341        \n    Filter: foo.a = bar.a\
2342        \n      Projection: foo.b\
2343        \n        TableScan: foo\
2344        \n  TableScan: bar";
2345        assert_eq!(expected, format!("{outer_query}"));
2346
2347        Ok(())
2348    }
2349
2350    #[test]
2351    fn projection_non_unique_names() -> Result<()> {
2352        let plan = table_scan(
2353            Some("employee_csv"),
2354            &employee_schema(),
2355            // project id and first_name by column index
2356            Some(vec![0, 1]),
2357        )?
2358        // two columns with the same name => error
2359        .project(vec![col("id"), col("first_name").alias("id")]);
2360
2361        match plan {
2362            Err(DataFusionError::SchemaError(
2363                SchemaError::AmbiguousReference {
2364                    field:
2365                        Column {
2366                            relation: Some(TableReference::Bare { table }),
2367                            name,
2368                            spans: _,
2369                        },
2370                },
2371                _,
2372            )) => {
2373                assert_eq!(*"employee_csv", *table);
2374                assert_eq!("id", &name);
2375                Ok(())
2376            }
2377            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2378        }
2379    }
2380
2381    fn employee_schema() -> Schema {
2382        Schema::new(vec![
2383            Field::new("id", DataType::Int32, false),
2384            Field::new("first_name", DataType::Utf8, false),
2385            Field::new("last_name", DataType::Utf8, false),
2386            Field::new("state", DataType::Utf8, false),
2387            Field::new("salary", DataType::Int32, false),
2388        ])
2389    }
2390
2391    #[test]
2392    fn stringified_plan() {
2393        let stringified_plan =
2394            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2395        assert!(stringified_plan.should_display(true));
2396        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2397
2398        let stringified_plan =
2399            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2400        assert!(stringified_plan.should_display(true));
2401        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2402
2403        let stringified_plan =
2404            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2405        assert!(stringified_plan.should_display(true));
2406        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2407
2408        let stringified_plan =
2409            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2410        assert!(stringified_plan.should_display(true));
2411        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2412
2413        let stringified_plan = StringifiedPlan::new(
2414            PlanType::OptimizedLogicalPlan {
2415                optimizer_name: "random opt pass".into(),
2416            },
2417            "...the plan...",
2418        );
2419        assert!(stringified_plan.should_display(true));
2420        assert!(!stringified_plan.should_display(false));
2421    }
2422
2423    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2424        let schema = Schema::new(vec![
2425            Field::new("a", DataType::UInt32, false),
2426            Field::new("b", DataType::UInt32, false),
2427            Field::new("c", DataType::UInt32, false),
2428        ]);
2429        table_scan(Some(name), &schema, None)?.build()
2430    }
2431
2432    #[test]
2433    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2434        let plan1 =
2435            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2436        let plan2 =
2437            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2438
2439        let expected = "Error during planning: INTERSECT/EXCEPT query must have the same number of columns. \
2440         Left is 1 and right is 2.";
2441        let err_msg1 =
2442            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2443                .unwrap_err();
2444
2445        assert_eq!(err_msg1.strip_backtrace(), expected);
2446
2447        Ok(())
2448    }
2449
2450    #[test]
2451    fn plan_builder_unnest() -> Result<()> {
2452        // Cannot unnest on a scalar column
2453        let err = nested_table_scan("test_table")?
2454            .unnest_column("scalar")
2455            .unwrap_err();
2456        assert!(err
2457            .to_string()
2458            .starts_with("Internal error: trying to unnest on invalid data type UInt32"));
2459
2460        // Unnesting the strings list.
2461        let plan = nested_table_scan("test_table")?
2462            .unnest_column("strings")?
2463            .build()?;
2464
2465        let expected = "\
2466        Unnest: lists[test_table.strings|depth=1] structs[]\
2467        \n  TableScan: test_table";
2468        assert_eq!(expected, format!("{plan}"));
2469
2470        // Check unnested field is a scalar
2471        let field = plan.schema().field_with_name(None, "strings").unwrap();
2472        assert_eq!(&DataType::Utf8, field.data_type());
2473
2474        // Unnesting the singular struct column result into 2 new columns for each subfield
2475        let plan = nested_table_scan("test_table")?
2476            .unnest_column("struct_singular")?
2477            .build()?;
2478
2479        let expected = "\
2480        Unnest: lists[] structs[test_table.struct_singular]\
2481        \n  TableScan: test_table";
2482        assert_eq!(expected, format!("{plan}"));
2483
2484        for field_name in &["a", "b"] {
2485            // Check unnested struct field is a scalar
2486            let field = plan
2487                .schema()
2488                .field_with_name(None, &format!("struct_singular.{}", field_name))
2489                .unwrap();
2490            assert_eq!(&DataType::UInt32, field.data_type());
2491        }
2492
2493        // Unnesting multiple fields in separate plans
2494        let plan = nested_table_scan("test_table")?
2495            .unnest_column("strings")?
2496            .unnest_column("structs")?
2497            .unnest_column("struct_singular")?
2498            .build()?;
2499
2500        let expected = "\
2501        Unnest: lists[] structs[test_table.struct_singular]\
2502        \n  Unnest: lists[test_table.structs|depth=1] structs[]\
2503        \n    Unnest: lists[test_table.strings|depth=1] structs[]\
2504        \n      TableScan: test_table";
2505        assert_eq!(expected, format!("{plan}"));
2506
2507        // Check unnested struct list field should be a struct.
2508        let field = plan.schema().field_with_name(None, "structs").unwrap();
2509        assert!(matches!(field.data_type(), DataType::Struct(_)));
2510
2511        // Unnesting multiple fields at the same time, using infer syntax
2512        let cols = vec!["strings", "structs", "struct_singular"]
2513            .into_iter()
2514            .map(|c| c.into())
2515            .collect();
2516
2517        let plan = nested_table_scan("test_table")?
2518            .unnest_columns_with_options(cols, UnnestOptions::default())?
2519            .build()?;
2520
2521        let expected = "\
2522        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]\
2523        \n  TableScan: test_table";
2524        assert_eq!(expected, format!("{plan}"));
2525
2526        // Unnesting missing column should fail.
2527        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2528        assert!(plan.is_err());
2529
2530        // Simultaneously unnesting a list (with different depth) and a struct column
2531        let plan = nested_table_scan("test_table")?
2532            .unnest_columns_with_options(
2533                vec!["stringss".into(), "struct_singular".into()],
2534                UnnestOptions::default()
2535                    .with_recursions(RecursionUnnestOption {
2536                        input_column: "stringss".into(),
2537                        output_column: "stringss_depth_1".into(),
2538                        depth: 1,
2539                    })
2540                    .with_recursions(RecursionUnnestOption {
2541                        input_column: "stringss".into(),
2542                        output_column: "stringss_depth_2".into(),
2543                        depth: 2,
2544                    }),
2545            )?
2546            .build()?;
2547
2548        let expected = "\
2549        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]\
2550        \n  TableScan: test_table";
2551        assert_eq!(expected, format!("{plan}"));
2552
2553        // Check output columns has correct type
2554        let field = plan
2555            .schema()
2556            .field_with_name(None, "stringss_depth_1")
2557            .unwrap();
2558        assert_eq!(
2559            &DataType::new_list(DataType::Utf8, false),
2560            field.data_type()
2561        );
2562        let field = plan
2563            .schema()
2564            .field_with_name(None, "stringss_depth_2")
2565            .unwrap();
2566        assert_eq!(&DataType::Utf8, field.data_type());
2567        // unnesting struct is still correct
2568        for field_name in &["a", "b"] {
2569            let field = plan
2570                .schema()
2571                .field_with_name(None, &format!("struct_singular.{}", field_name))
2572                .unwrap();
2573            assert_eq!(&DataType::UInt32, field.data_type());
2574        }
2575
2576        Ok(())
2577    }
2578
2579    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2580        // Create a schema with a scalar field, a list of strings, a list of structs
2581        // and a singular struct
2582        let struct_field_in_list = Field::new_struct(
2583            "item",
2584            vec![
2585                Field::new("a", DataType::UInt32, false),
2586                Field::new("b", DataType::UInt32, false),
2587            ],
2588            false,
2589        );
2590        let string_field = Field::new_list_field(DataType::Utf8, false);
2591        let strings_field = Field::new_list("item", string_field.clone(), false);
2592        let schema = Schema::new(vec![
2593            Field::new("scalar", DataType::UInt32, false),
2594            Field::new_list("strings", string_field, false),
2595            Field::new_list("structs", struct_field_in_list, false),
2596            Field::new(
2597                "struct_singular",
2598                DataType::Struct(Fields::from(vec![
2599                    Field::new("a", DataType::UInt32, false),
2600                    Field::new("b", DataType::UInt32, false),
2601                ])),
2602                false,
2603            ),
2604            Field::new_list("stringss", strings_field, false),
2605        ]);
2606
2607        table_scan(Some(table_name), &schema, None)
2608    }
2609
2610    #[test]
2611    fn test_union_after_join() -> Result<()> {
2612        let values = vec![vec![lit(1)]];
2613
2614        let left = LogicalPlanBuilder::values(values.clone())?
2615            .alias("left")?
2616            .build()?;
2617        let right = LogicalPlanBuilder::values(values)?
2618            .alias("right")?
2619            .build()?;
2620
2621        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2622
2623        let _ = LogicalPlanBuilder::from(join.clone())
2624            .union(join)?
2625            .build()?;
2626
2627        Ok(())
2628    }
2629
2630    #[test]
2631    fn test_change_redundant_column() -> Result<()> {
2632        let t1_field_1 = Field::new("a", DataType::Int32, false);
2633        let t2_field_1 = Field::new("a", DataType::Int32, false);
2634        let t2_field_3 = Field::new("a", DataType::Int32, false);
2635        let t1_field_2 = Field::new("b", DataType::Int32, false);
2636        let t2_field_2 = Field::new("b", DataType::Int32, false);
2637
2638        let field_vec = vec![t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3];
2639        let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2640
2641        assert_eq!(
2642            remove_redundant,
2643            vec![
2644                Field::new("a", DataType::Int32, false),
2645                Field::new("a:1", DataType::Int32, false),
2646                Field::new("b", DataType::Int32, false),
2647                Field::new("b:1", DataType::Int32, false),
2648                Field::new("a:2", DataType::Int32, false),
2649            ]
2650        );
2651        Ok(())
2652    }
2653
2654    #[test]
2655    fn plan_builder_from_logical_plan() -> Result<()> {
2656        let plan =
2657            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2658                .sort(vec![
2659                    expr::Sort::new(col("state"), true, true),
2660                    expr::Sort::new(col("salary"), false, false),
2661                ])?
2662                .build()?;
2663
2664        let plan_expected = format!("{plan}");
2665        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2666        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2667
2668        Ok(())
2669    }
2670
2671    #[test]
2672    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2673        let constraints =
2674            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2675        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2676
2677        let plan =
2678            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2679                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2680                .build()?;
2681
2682        let expected =
2683            "Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]\
2684        \n  TableScan: employee_csv projection=[id, state, salary]";
2685        assert_eq!(expected, format!("{plan}"));
2686
2687        Ok(())
2688    }
2689
2690    #[test]
2691    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2692        let constraints =
2693            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2694        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2695
2696        let options =
2697            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2698        let plan =
2699            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2700                .with_options(options)
2701                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2702                .build()?;
2703
2704        let expected =
2705            "Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]\
2706        \n  TableScan: employee_csv projection=[id, state, salary]";
2707        assert_eq!(expected, format!("{plan}"));
2708
2709        Ok(())
2710    }
2711}