datafusion_optimizer/optimize_projections/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`OptimizeProjections`] identifies and eliminates unused columns
19
20mod required_indices;
21
22use crate::optimizer::ApplyOrder;
23use crate::{OptimizerConfig, OptimizerRule};
24use std::collections::HashSet;
25use std::sync::Arc;
26
27use datafusion_common::{
28    get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column,
29    HashMap, JoinType, Result,
30};
31use datafusion_expr::expr::Alias;
32use datafusion_expr::Unnest;
33use datafusion_expr::{
34    logical_plan::LogicalPlan, projection_schema, Aggregate, Distinct, Expr, Projection,
35    TableScan, Window,
36};
37
38use crate::optimize_projections::required_indices::RequiredIndices;
39use crate::utils::NamePreserver;
40use datafusion_common::tree_node::{
41    Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
42};
43
44/// Optimizer rule to prune unnecessary columns from intermediate schemas
45/// inside the [`LogicalPlan`]. This rule:
46/// - Removes unnecessary columns that do not appear at the output and/or are
47///   not used during any computation step.
48/// - Adds projections to decrease table column size before operators that
49///   benefit from a smaller memory footprint at its input.
50/// - Removes unnecessary [`LogicalPlan::Projection`]s from the [`LogicalPlan`].
51///
52/// `OptimizeProjections` is an optimizer rule that identifies and eliminates
53/// columns from a logical plan that are not used by downstream operations.
54/// This can improve query performance and reduce unnecessary data processing.
55///
56/// The rule analyzes the input logical plan, determines the necessary column
57/// indices, and then removes any unnecessary columns. It also removes any
58/// unnecessary projections from the plan tree.
59#[derive(Default, Debug)]
60pub struct OptimizeProjections {}
61
62impl OptimizeProjections {
63    #[allow(missing_docs)]
64    pub fn new() -> Self {
65        Self {}
66    }
67}
68
69impl OptimizerRule for OptimizeProjections {
70    fn name(&self) -> &str {
71        "optimize_projections"
72    }
73
74    fn apply_order(&self) -> Option<ApplyOrder> {
75        None
76    }
77
78    fn supports_rewrite(&self) -> bool {
79        true
80    }
81
82    fn rewrite(
83        &self,
84        plan: LogicalPlan,
85        config: &dyn OptimizerConfig,
86    ) -> Result<Transformed<LogicalPlan>> {
87        // All output fields are necessary:
88        let indices = RequiredIndices::new_for_all_exprs(&plan);
89        optimize_projections(plan, config, indices)
90    }
91}
92
93/// Removes unnecessary columns (e.g. columns that do not appear in the output
94/// schema and/or are not used during any computation step such as expression
95/// evaluation) from the logical plan and its inputs.
96///
97/// # Parameters
98///
99/// - `plan`: A reference to the input `LogicalPlan` to optimize.
100/// - `config`: A reference to the optimizer configuration.
101/// - `indices`: A slice of column indices that represent the necessary column
102///   indices for downstream (parent) plan nodes.
103///
104/// # Returns
105///
106/// A `Result` object with the following semantics:
107///
108/// - `Ok(Some(LogicalPlan))`: An optimized `LogicalPlan` without unnecessary
109///   columns.
110/// - `Ok(None)`: Signal that the given logical plan did not require any change.
111/// - `Err(error)`: An error occurred during the optimization process.
112#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
113fn optimize_projections(
114    plan: LogicalPlan,
115    config: &dyn OptimizerConfig,
116    indices: RequiredIndices,
117) -> Result<Transformed<LogicalPlan>> {
118    // Recursively rewrite any nodes that may be able to avoid computation given
119    // their parents' required indices.
120    match plan {
121        LogicalPlan::Projection(proj) => {
122            return merge_consecutive_projections(proj)?.transform_data(|proj| {
123                rewrite_projection_given_requirements(proj, config, &indices)
124            })
125        }
126        LogicalPlan::Aggregate(aggregate) => {
127            // Split parent requirements to GROUP BY and aggregate sections:
128            let n_group_exprs = aggregate.group_expr_len()?;
129            // Offset aggregate indices so that they point to valid indices at
130            // `aggregate.aggr_expr`:
131            let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);
132
133            // Get absolutely necessary GROUP BY fields:
134            let group_by_expr_existing = aggregate
135                .group_expr
136                .iter()
137                .map(|group_by_expr| group_by_expr.schema_name().to_string())
138                .collect::<Vec<_>>();
139
140            let new_group_bys = if let Some(simplest_groupby_indices) =
141                get_required_group_by_exprs_indices(
142                    aggregate.input.schema(),
143                    &group_by_expr_existing,
144                ) {
145                // Some of the fields in the GROUP BY may be required by the
146                // parent even if these fields are unnecessary in terms of
147                // functional dependency.
148                group_by_reqs
149                    .append(&simplest_groupby_indices)
150                    .get_at_indices(&aggregate.group_expr)
151            } else {
152                aggregate.group_expr
153            };
154
155            // Only use the absolutely necessary aggregate expressions required
156            // by the parent:
157            let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
158
159            // Aggregations always need at least one aggregate expression.
160            // With a nested count, we don't require any column as input, but
161            // still need to create a correct aggregate, which may be optimized
162            // out later. As an example, consider the following query:
163            //
164            // SELECT count(*) FROM (SELECT count(*) FROM [...])
165            //
166            // which always returns 1.
167            if new_aggr_expr.is_empty()
168                && new_group_bys.is_empty()
169                && !aggregate.aggr_expr.is_empty()
170            {
171                // take the old, first aggregate expression
172                new_aggr_expr = aggregate.aggr_expr;
173                new_aggr_expr.resize_with(1, || unreachable!());
174            }
175
176            let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
177            let schema = aggregate.input.schema();
178            let necessary_indices =
179                RequiredIndices::new().with_exprs(schema, all_exprs_iter);
180            let necessary_exprs = necessary_indices.get_required_exprs(schema);
181
182            return optimize_projections(
183                Arc::unwrap_or_clone(aggregate.input),
184                config,
185                necessary_indices,
186            )?
187            .transform_data(|aggregate_input| {
188                // Simplify the input of the aggregation by adding a projection so
189                // that its input only contains absolutely necessary columns for
190                // the aggregate expressions. Note that necessary_indices refer to
191                // fields in `aggregate.input.schema()`.
192                add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
193            })?
194            .map_data(|aggregate_input| {
195                // Create a new aggregate plan with the updated input and only the
196                // absolutely necessary fields:
197                Aggregate::try_new(
198                    Arc::new(aggregate_input),
199                    new_group_bys,
200                    new_aggr_expr,
201                )
202                .map(LogicalPlan::Aggregate)
203            });
204        }
205        LogicalPlan::Window(window) => {
206            let input_schema = Arc::clone(window.input.schema());
207            // Split parent requirements to child and window expression sections:
208            let n_input_fields = input_schema.fields().len();
209            // Offset window expression indices so that they point to valid
210            // indices at `window.window_expr`:
211            let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
212
213            // Only use window expressions that are absolutely necessary according
214            // to parent requirements:
215            let new_window_expr = window_reqs.get_at_indices(&window.window_expr);
216
217            // Get all the required column indices at the input, either by the
218            // parent or window expression requirements.
219            let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);
220
221            return optimize_projections(
222                Arc::unwrap_or_clone(window.input),
223                config,
224                required_indices.clone(),
225            )?
226            .transform_data(|window_child| {
227                if new_window_expr.is_empty() {
228                    // When no window expression is necessary, use the input directly:
229                    Ok(Transformed::no(window_child))
230                } else {
231                    // Calculate required expressions at the input of the window.
232                    // Please note that we use `input_schema`, because `required_indices`
233                    // refers to that schema
234                    let required_exprs =
235                        required_indices.get_required_exprs(&input_schema);
236                    let window_child =
237                        add_projection_on_top_if_helpful(window_child, required_exprs)?
238                            .data;
239                    Window::try_new(new_window_expr, Arc::new(window_child))
240                        .map(LogicalPlan::Window)
241                        .map(Transformed::yes)
242                }
243            });
244        }
245        LogicalPlan::TableScan(table_scan) => {
246            let TableScan {
247                table_name,
248                source,
249                projection,
250                filters,
251                fetch,
252                projected_schema: _,
253            } = table_scan;
254
255            // Get indices referred to in the original (schema with all fields)
256            // given projected indices.
257            let projection = match &projection {
258                Some(projection) => indices.into_mapped_indices(|idx| projection[idx]),
259                None => indices.into_inner(),
260            };
261            return TableScan::try_new(
262                table_name,
263                source,
264                Some(projection),
265                filters,
266                fetch,
267            )
268            .map(LogicalPlan::TableScan)
269            .map(Transformed::yes);
270        }
271        // Other node types are handled below
272        _ => {}
273    };
274
275    // For other plan node types, calculate indices for columns they use and
276    // try to rewrite their children
277    let mut child_required_indices: Vec<RequiredIndices> = match &plan {
278        LogicalPlan::Sort(_)
279        | LogicalPlan::Filter(_)
280        | LogicalPlan::Repartition(_)
281        | LogicalPlan::Union(_)
282        | LogicalPlan::SubqueryAlias(_)
283        | LogicalPlan::Distinct(Distinct::On(_)) => {
284            // Pass index requirements from the parent as well as column indices
285            // that appear in this plan's expressions to its child. All these
286            // operators benefit from "small" inputs, so the projection_beneficial
287            // flag is `true`.
288            plan.inputs()
289                .into_iter()
290                .map(|input| {
291                    indices
292                        .clone()
293                        .with_projection_beneficial()
294                        .with_plan_exprs(&plan, input.schema())
295                })
296                .collect::<Result<_>>()?
297        }
298        LogicalPlan::Limit(_) => {
299            // Pass index requirements from the parent as well as column indices
300            // that appear in this plan's expressions to its child. These operators
301            // do not benefit from "small" inputs, so the projection_beneficial
302            // flag is `false`.
303            plan.inputs()
304                .into_iter()
305                .map(|input| indices.clone().with_plan_exprs(&plan, input.schema()))
306                .collect::<Result<_>>()?
307        }
308        LogicalPlan::Copy(_)
309        | LogicalPlan::Ddl(_)
310        | LogicalPlan::Dml(_)
311        | LogicalPlan::Explain(_)
312        | LogicalPlan::Analyze(_)
313        | LogicalPlan::Subquery(_)
314        | LogicalPlan::Statement(_)
315        | LogicalPlan::Distinct(Distinct::All(_)) => {
316            // These plans require all their fields, and their children should
317            // be treated as final plans -- otherwise, we may have schema a
318            // mismatch.
319            // TODO: For some subquery variants (e.g. a subquery arising from an
320            //       EXISTS expression), we may not need to require all indices.
321            plan.inputs()
322                .into_iter()
323                .map(RequiredIndices::new_for_all_exprs)
324                .collect()
325        }
326        LogicalPlan::Extension(extension) => {
327            let Some(necessary_children_indices) =
328                extension.node.necessary_children_exprs(indices.indices())
329            else {
330                // Requirements from parent cannot be routed down to user defined logical plan safely
331                return Ok(Transformed::no(plan));
332            };
333            let children = extension.node.inputs();
334            if children.len() != necessary_children_indices.len() {
335                return internal_err!("Inconsistent length between children and necessary children indices. \
336                Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \
337                consistent with actual children length for the node.");
338            }
339            children
340                .into_iter()
341                .zip(necessary_children_indices)
342                .map(|(child, necessary_indices)| {
343                    RequiredIndices::new_from_indices(necessary_indices)
344                        .with_plan_exprs(&plan, child.schema())
345                })
346                .collect::<Result<Vec<_>>>()?
347        }
348        LogicalPlan::EmptyRelation(_)
349        | LogicalPlan::RecursiveQuery(_)
350        | LogicalPlan::Values(_)
351        | LogicalPlan::DescribeTable(_) => {
352            // These operators have no inputs, so stop the optimization process.
353            return Ok(Transformed::no(plan));
354        }
355        LogicalPlan::Join(join) => {
356            let left_len = join.left.schema().fields().len();
357            let (left_req_indices, right_req_indices) =
358                split_join_requirements(left_len, indices, &join.join_type);
359            let left_indices =
360                left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
361            let right_indices =
362                right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
363            // Joins benefit from "small" input tables (lower memory usage).
364            // Therefore, each child benefits from projection:
365            vec![
366                left_indices.with_projection_beneficial(),
367                right_indices.with_projection_beneficial(),
368            ]
369        }
370        // these nodes are explicitly rewritten in the match statement above
371        LogicalPlan::Projection(_)
372        | LogicalPlan::Aggregate(_)
373        | LogicalPlan::Window(_)
374        | LogicalPlan::TableScan(_) => {
375            return internal_err!(
376                "OptimizeProjection: should have handled in the match statement above"
377            );
378        }
379        LogicalPlan::Unnest(Unnest {
380            dependency_indices, ..
381        }) => {
382            vec![RequiredIndices::new_from_indices(
383                dependency_indices.clone(),
384            )]
385        }
386    };
387
388    // Required indices are currently ordered (child0, child1, ...)
389    // but the loop pops off the last element, so we need to reverse the order
390    child_required_indices.reverse();
391    if child_required_indices.len() != plan.inputs().len() {
392        return internal_err!(
393            "OptimizeProjection: child_required_indices length mismatch with plan inputs"
394        );
395    }
396
397    // Rewrite children of the plan
398    let transformed_plan = plan.map_children(|child| {
399        let required_indices = child_required_indices.pop().ok_or_else(|| {
400            internal_datafusion_err!(
401                "Unexpected number of required_indices in OptimizeProjections rule"
402            )
403        })?;
404
405        let projection_beneficial = required_indices.projection_beneficial();
406        let project_exprs = required_indices.get_required_exprs(child.schema());
407
408        optimize_projections(child, config, required_indices)?.transform_data(
409            |new_input| {
410                if projection_beneficial {
411                    add_projection_on_top_if_helpful(new_input, project_exprs)
412                } else {
413                    Ok(Transformed::no(new_input))
414                }
415            },
416        )
417    })?;
418
419    // If any of the children are transformed, we need to potentially update the plan's schema
420    if transformed_plan.transformed {
421        transformed_plan.map_data(|plan| plan.recompute_schema())
422    } else {
423        Ok(transformed_plan)
424    }
425}
426
427/// Merges consecutive projections.
428///
429/// Given a projection `proj`, this function attempts to merge it with a previous
430/// projection if it exists and if merging is beneficial. Merging is considered
431/// beneficial when expressions in the current projection are non-trivial and
432/// appear more than once in its input fields. This can act as a caching mechanism
433/// for non-trivial computations.
434///
435/// # Parameters
436///
437/// * `proj` - A reference to the `Projection` to be merged.
438///
439/// # Returns
440///
441/// A `Result` object with the following semantics:
442///
443/// - `Ok(Some(Projection))`: Merge was beneficial and successful. Contains the
444///   merged projection.
445/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken place).
446/// - `Err(error)`: An error occurred during the function call.
447fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
448    let Projection {
449        expr,
450        input,
451        schema,
452        ..
453    } = proj;
454    let LogicalPlan::Projection(prev_projection) = input.as_ref() else {
455        return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
456    };
457
458    // Count usages (referrals) of each projection expression in its input fields:
459    let mut column_referral_map = HashMap::<&Column, usize>::new();
460    expr.iter()
461        .for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
462
463    // If an expression is non-trivial and appears more than once, do not merge
464    // them as consecutive projections will benefit from a compute-once approach.
465    // For details, see: https://github.com/apache/datafusion/issues/8296
466    if column_referral_map.into_iter().any(|(col, usage)| {
467        usage > 1
468            && !is_expr_trivial(
469                &prev_projection.expr
470                    [prev_projection.schema.index_of_column(col).unwrap()],
471            )
472    }) {
473        // no change
474        return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
475    }
476
477    let LogicalPlan::Projection(prev_projection) = Arc::unwrap_or_clone(input) else {
478        // We know it is a `LogicalPlan::Projection` from check above
479        unreachable!();
480    };
481
482    // Try to rewrite the expressions in the current projection using the
483    // previous projection as input:
484    let name_preserver = NamePreserver::new_for_projection();
485    let mut original_names = vec![];
486    let new_exprs = expr.map_elements(|expr| {
487        original_names.push(name_preserver.save(&expr));
488
489        // do not rewrite top level Aliases (rewriter will remove all aliases within exprs)
490        match expr {
491            Expr::Alias(Alias {
492                expr,
493                relation,
494                name,
495                metadata,
496            }) => rewrite_expr(*expr, &prev_projection).map(|result| {
497                result.update_data(|expr| {
498                    Expr::Alias(Alias::new(expr, relation, name).with_metadata(metadata))
499                })
500            }),
501            e => rewrite_expr(e, &prev_projection),
502        }
503    })?;
504
505    // if the expressions could be rewritten, create a new projection with the
506    // new expressions
507    if new_exprs.transformed {
508        // Add any needed aliases back to the expressions
509        let new_exprs = new_exprs
510            .data
511            .into_iter()
512            .zip(original_names)
513            .map(|(expr, original_name)| original_name.restore(expr))
514            .collect::<Vec<_>>();
515        Projection::try_new(new_exprs, prev_projection.input).map(Transformed::yes)
516    } else {
517        // not rewritten, so put the projection back together
518        let input = Arc::new(LogicalPlan::Projection(prev_projection));
519        Projection::try_new_with_schema(new_exprs.data, input, schema)
520            .map(Transformed::no)
521    }
522}
523
524// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
525fn is_expr_trivial(expr: &Expr) -> bool {
526    matches!(expr, Expr::Column(_) | Expr::Literal(_))
527}
528
529/// Rewrites a projection expression using the projection before it (i.e. its input)
530/// This is a subroutine to the `merge_consecutive_projections` function.
531///
532/// # Parameters
533///
534/// * `expr` - A reference to the expression to rewrite.
535/// * `input` - A reference to the input of the projection expression (itself
536///   a projection).
537///
538/// # Returns
539///
540/// A `Result` object with the following semantics:
541///
542/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
543/// - `Ok(None)`: Signals that `expr` can not be rewritten.
544/// - `Err(error)`: An error occurred during the function call.
545///
546/// # Notes
547/// This rewrite also removes any unnecessary layers of aliasing.
548///
549/// Without trimming, we can end up with unnecessary indirections inside expressions
550/// during projection merges.
551///
552/// Consider:
553///
554/// ```text
555/// Projection(a1 + b1 as sum1)
556/// --Projection(a as a1, b as b1)
557/// ----Source(a, b)
558/// ```
559///
560/// After merge, we want to produce:
561///
562/// ```text
563/// Projection(a + b as sum1)
564/// --Source(a, b)
565/// ```
566///
567/// Without trimming, we would end up with:
568///
569/// ```text
570/// Projection((a as a1 + b as b1) as sum1)
571/// --Source(a, b)
572/// ```
573fn rewrite_expr(expr: Expr, input: &Projection) -> Result<Transformed<Expr>> {
574    expr.transform_up(|expr| {
575        match expr {
576            //  remove any intermediate aliases
577            Expr::Alias(alias) => Ok(Transformed::yes(*alias.expr)),
578            Expr::Column(col) => {
579                // Find index of column:
580                let idx = input.schema.index_of_column(&col)?;
581                // get the corresponding unaliased input expression
582                //
583                // For example:
584                // * the input projection is [`a + b` as c, `d + e` as f]
585                // * the current column is an expression "f"
586                //
587                // return the expression `d + e` (not `d + e` as f)
588                let input_expr = input.expr[idx].clone().unalias_nested().data;
589                Ok(Transformed::yes(input_expr))
590            }
591            // Unsupported type for consecutive projection merge analysis.
592            _ => Ok(Transformed::no(expr)),
593        }
594    })
595}
596
597/// Accumulates outer-referenced columns by the
598/// given expression, `expr`.
599///
600/// # Parameters
601///
602/// * `expr` - The expression to analyze for outer-referenced columns.
603/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
604///   columns are collected.
605fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) {
606    // inspect_expr_pre doesn't handle subquery references, so find them explicitly
607    expr.apply(|expr| {
608        match expr {
609            Expr::OuterReferenceColumn(_, col) => {
610                columns.insert(col);
611            }
612            Expr::ScalarSubquery(subquery) => {
613                outer_columns_helper_multi(&subquery.outer_ref_columns, columns);
614            }
615            Expr::Exists(exists) => {
616                outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns);
617            }
618            Expr::InSubquery(insubquery) => {
619                outer_columns_helper_multi(
620                    &insubquery.subquery.outer_ref_columns,
621                    columns,
622                );
623            }
624            _ => {}
625        };
626        Ok(TreeNodeRecursion::Continue)
627    })
628    // unwrap: closure above never returns Err, so can not be Err here
629    .unwrap();
630}
631
632/// A recursive subroutine that accumulates outer-referenced columns by the
633/// given expressions (`exprs`).
634///
635/// # Parameters
636///
637/// * `exprs` - The expressions to analyze for outer-referenced columns.
638/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
639///   columns are collected.
640fn outer_columns_helper_multi<'a, 'b>(
641    exprs: impl IntoIterator<Item = &'a Expr>,
642    columns: &'b mut HashSet<&'a Column>,
643) {
644    exprs.into_iter().for_each(|e| outer_columns(e, columns));
645}
646
647/// Splits requirement indices for a join into left and right children based on
648/// the join type.
649///
650/// This function takes the length of the left child, a slice of requirement
651/// indices, and the type of join (e.g. `INNER`, `LEFT`, `RIGHT`) as arguments.
652/// Depending on the join type, it divides the requirement indices into those
653/// that apply to the left child and those that apply to the right child.
654///
655/// - For `INNER`, `LEFT`, `RIGHT` and `FULL` joins, the requirements are split
656///   between left and right children. The right child indices are adjusted to
657///   point to valid positions within the right child by subtracting the length
658///   of the left child.
659///
660/// - For `LEFT ANTI`, `LEFT SEMI`, `RIGHT SEMI` and `RIGHT ANTI` joins, all
661///   requirements are re-routed to either the left child or the right child
662///   directly, depending on the join type.
663///
664/// # Parameters
665///
666/// * `left_len` - The length of the left child.
667/// * `indices` - A slice of requirement indices.
668/// * `join_type` - The type of join (e.g. `INNER`, `LEFT`, `RIGHT`).
669///
670/// # Returns
671///
672/// A tuple containing two vectors of `usize` indices: The first vector represents
673/// the requirements for the left child, and the second vector represents the
674/// requirements for the right child. The indices are appropriately split and
675/// adjusted based on the join type.
676fn split_join_requirements(
677    left_len: usize,
678    indices: RequiredIndices,
679    join_type: &JoinType,
680) -> (RequiredIndices, RequiredIndices) {
681    match join_type {
682        // In these cases requirements are split between left/right children:
683        JoinType::Inner
684        | JoinType::Left
685        | JoinType::Right
686        | JoinType::Full
687        | JoinType::LeftMark => {
688            // Decrease right side indices by `left_len` so that they point to valid
689            // positions within the right child:
690            indices.split_off(left_len)
691        }
692        // All requirements can be re-routed to left child directly.
693        JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()),
694        // All requirements can be re-routed to right side directly.
695        // No need to change index, join schema is right child schema.
696        JoinType::RightSemi | JoinType::RightAnti => (RequiredIndices::new(), indices),
697    }
698}
699
700/// Adds a projection on top of a logical plan if doing so reduces the number
701/// of columns for the parent operator.
702///
703/// This function takes a `LogicalPlan` and a list of projection expressions.
704/// If the projection is beneficial (it reduces the number of columns in the
705/// plan) a new `LogicalPlan` with the projection is created and returned, along
706/// with a `true` flag. If the projection doesn't reduce the number of columns,
707/// the original plan is returned with a `false` flag.
708///
709/// # Parameters
710///
711/// * `plan` - The input `LogicalPlan` to potentially add a projection to.
712/// * `project_exprs` - A list of expressions for the projection.
713///
714/// # Returns
715///
716/// A `Transformed` indicating if a projection was added
717fn add_projection_on_top_if_helpful(
718    plan: LogicalPlan,
719    project_exprs: Vec<Expr>,
720) -> Result<Transformed<LogicalPlan>> {
721    // Make sure projection decreases the number of columns, otherwise it is unnecessary.
722    if project_exprs.len() >= plan.schema().fields().len() {
723        Ok(Transformed::no(plan))
724    } else {
725        Projection::try_new(project_exprs, Arc::new(plan))
726            .map(LogicalPlan::Projection)
727            .map(Transformed::yes)
728    }
729}
730
731/// Rewrite the given projection according to the fields required by its
732/// ancestors.
733///
734/// # Parameters
735///
736/// * `proj` - A reference to the original projection to rewrite.
737/// * `config` - A reference to the optimizer configuration.
738/// * `indices` - A slice of indices representing the columns required by the
739///   ancestors of the given projection.
740///
741/// # Returns
742///
743/// A `Result` object with the following semantics:
744///
745/// - `Ok(Some(LogicalPlan))`: Contains the rewritten projection
746/// - `Ok(None)`: No rewrite necessary.
747/// - `Err(error)`: An error occurred during the function call.
748fn rewrite_projection_given_requirements(
749    proj: Projection,
750    config: &dyn OptimizerConfig,
751    indices: &RequiredIndices,
752) -> Result<Transformed<LogicalPlan>> {
753    let Projection { expr, input, .. } = proj;
754
755    let exprs_used = indices.get_at_indices(&expr);
756
757    let required_indices =
758        RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter());
759
760    // rewrite the children projection, and if they are changed rewrite the
761    // projection down
762    optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)?
763        .transform_data(|input| {
764            if is_projection_unnecessary(&input, &exprs_used)? {
765                Ok(Transformed::yes(input))
766            } else {
767                Projection::try_new(exprs_used, Arc::new(input))
768                    .map(LogicalPlan::Projection)
769                    .map(Transformed::yes)
770            }
771        })
772}
773
774/// Projection is unnecessary, when
775/// - input schema of the projection, output schema of the projection are same, and
776/// - all projection expressions are either Column or Literal
777fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result<bool> {
778    let proj_schema = projection_schema(input, proj_exprs)?;
779    Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial))
780}
781
782#[cfg(test)]
783mod tests {
784    use std::cmp::Ordering;
785    use std::collections::HashMap;
786    use std::fmt::Formatter;
787    use std::ops::Add;
788    use std::sync::Arc;
789    use std::vec;
790
791    use crate::optimize_projections::OptimizeProjections;
792    use crate::optimizer::Optimizer;
793    use crate::test::{
794        assert_fields_eq, assert_optimized_plan_eq, scan_empty, test_table_scan,
795        test_table_scan_fields, test_table_scan_with_name,
796    };
797    use crate::{OptimizerContext, OptimizerRule};
798    use arrow::datatypes::{DataType, Field, Schema};
799    use datafusion_common::{
800        Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
801    };
802    use datafusion_expr::ExprFunctionExt;
803    use datafusion_expr::{
804        binary_expr, build_join_schema,
805        builder::table_scan_with_filters,
806        col,
807        expr::{self, Cast},
808        lit,
809        logical_plan::{builder::LogicalPlanBuilder, table_scan},
810        not, try_cast, when, BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator,
811        Projection, UserDefinedLogicalNodeCore, WindowFunctionDefinition,
812    };
813
814    use datafusion_functions_aggregate::count::count_udaf;
815    use datafusion_functions_aggregate::expr_fn::{count, max, min};
816    use datafusion_functions_aggregate::min_max::max_udaf;
817
818    fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
819        assert_optimized_plan_eq(Arc::new(OptimizeProjections::new()), plan, expected)
820    }
821
822    #[derive(Debug, Hash, PartialEq, Eq)]
823    struct NoOpUserDefined {
824        exprs: Vec<Expr>,
825        schema: DFSchemaRef,
826        input: Arc<LogicalPlan>,
827    }
828
829    impl NoOpUserDefined {
830        fn new(schema: DFSchemaRef, input: Arc<LogicalPlan>) -> Self {
831            Self {
832                exprs: vec![],
833                schema,
834                input,
835            }
836        }
837
838        fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
839            self.exprs = exprs;
840            self
841        }
842    }
843
844    // Manual implementation needed because of `schema` field. Comparison excludes this field.
845    impl PartialOrd for NoOpUserDefined {
846        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
847            match self.exprs.partial_cmp(&other.exprs) {
848                Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
849                cmp => cmp,
850            }
851        }
852    }
853
854    impl UserDefinedLogicalNodeCore for NoOpUserDefined {
855        fn name(&self) -> &str {
856            "NoOpUserDefined"
857        }
858
859        fn inputs(&self) -> Vec<&LogicalPlan> {
860            vec![&self.input]
861        }
862
863        fn schema(&self) -> &DFSchemaRef {
864            &self.schema
865        }
866
867        fn expressions(&self) -> Vec<Expr> {
868            self.exprs.clone()
869        }
870
871        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
872            write!(f, "NoOpUserDefined")
873        }
874
875        fn with_exprs_and_inputs(
876            &self,
877            exprs: Vec<Expr>,
878            mut inputs: Vec<LogicalPlan>,
879        ) -> Result<Self> {
880            Ok(Self {
881                exprs,
882                input: Arc::new(inputs.swap_remove(0)),
883                schema: Arc::clone(&self.schema),
884            })
885        }
886
887        fn necessary_children_exprs(
888            &self,
889            output_columns: &[usize],
890        ) -> Option<Vec<Vec<usize>>> {
891            // Since schema is same. Output columns requires their corresponding version in the input columns.
892            Some(vec![output_columns.to_vec()])
893        }
894
895        fn supports_limit_pushdown(&self) -> bool {
896            false // Disallow limit push-down by default
897        }
898    }
899
900    #[derive(Debug, Hash, PartialEq, Eq)]
901    struct UserDefinedCrossJoin {
902        exprs: Vec<Expr>,
903        schema: DFSchemaRef,
904        left_child: Arc<LogicalPlan>,
905        right_child: Arc<LogicalPlan>,
906    }
907
908    impl UserDefinedCrossJoin {
909        fn new(left_child: Arc<LogicalPlan>, right_child: Arc<LogicalPlan>) -> Self {
910            let left_schema = left_child.schema();
911            let right_schema = right_child.schema();
912            let schema = Arc::new(
913                build_join_schema(left_schema, right_schema, &JoinType::Inner).unwrap(),
914            );
915            Self {
916                exprs: vec![],
917                schema,
918                left_child,
919                right_child,
920            }
921        }
922    }
923
924    // Manual implementation needed because of `schema` field. Comparison excludes this field.
925    impl PartialOrd for UserDefinedCrossJoin {
926        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
927            match self.exprs.partial_cmp(&other.exprs) {
928                Some(Ordering::Equal) => {
929                    match self.left_child.partial_cmp(&other.left_child) {
930                        Some(Ordering::Equal) => {
931                            self.right_child.partial_cmp(&other.right_child)
932                        }
933                        cmp => cmp,
934                    }
935                }
936                cmp => cmp,
937            }
938        }
939    }
940
941    impl UserDefinedLogicalNodeCore for UserDefinedCrossJoin {
942        fn name(&self) -> &str {
943            "UserDefinedCrossJoin"
944        }
945
946        fn inputs(&self) -> Vec<&LogicalPlan> {
947            vec![&self.left_child, &self.right_child]
948        }
949
950        fn schema(&self) -> &DFSchemaRef {
951            &self.schema
952        }
953
954        fn expressions(&self) -> Vec<Expr> {
955            self.exprs.clone()
956        }
957
958        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
959            write!(f, "UserDefinedCrossJoin")
960        }
961
962        fn with_exprs_and_inputs(
963            &self,
964            exprs: Vec<Expr>,
965            mut inputs: Vec<LogicalPlan>,
966        ) -> Result<Self> {
967            assert_eq!(inputs.len(), 2);
968            Ok(Self {
969                exprs,
970                left_child: Arc::new(inputs.remove(0)),
971                right_child: Arc::new(inputs.remove(0)),
972                schema: Arc::clone(&self.schema),
973            })
974        }
975
976        fn necessary_children_exprs(
977            &self,
978            output_columns: &[usize],
979        ) -> Option<Vec<Vec<usize>>> {
980            let left_child_len = self.left_child.schema().fields().len();
981            let mut left_reqs = vec![];
982            let mut right_reqs = vec![];
983            for &out_idx in output_columns {
984                if out_idx < left_child_len {
985                    left_reqs.push(out_idx);
986                } else {
987                    // Output indices further than the left_child_len
988                    // comes from right children
989                    right_reqs.push(out_idx - left_child_len)
990                }
991            }
992            Some(vec![left_reqs, right_reqs])
993        }
994
995        fn supports_limit_pushdown(&self) -> bool {
996            false // Disallow limit push-down by default
997        }
998    }
999
1000    #[test]
1001    fn merge_two_projection() -> Result<()> {
1002        let table_scan = test_table_scan()?;
1003        let plan = LogicalPlanBuilder::from(table_scan)
1004            .project(vec![col("a")])?
1005            .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1006            .build()?;
1007
1008        let expected = "Projection: Int32(1) + test.a\
1009        \n  TableScan: test projection=[a]";
1010        assert_optimized_plan_equal(plan, expected)
1011    }
1012
1013    #[test]
1014    fn merge_three_projection() -> Result<()> {
1015        let table_scan = test_table_scan()?;
1016        let plan = LogicalPlanBuilder::from(table_scan)
1017            .project(vec![col("a"), col("b")])?
1018            .project(vec![col("a")])?
1019            .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1020            .build()?;
1021
1022        let expected = "Projection: Int32(1) + test.a\
1023        \n  TableScan: test projection=[a]";
1024        assert_optimized_plan_equal(plan, expected)
1025    }
1026
1027    #[test]
1028    fn merge_alias() -> Result<()> {
1029        let table_scan = test_table_scan()?;
1030        let plan = LogicalPlanBuilder::from(table_scan)
1031            .project(vec![col("a")])?
1032            .project(vec![col("a").alias("alias")])?
1033            .build()?;
1034
1035        let expected = "Projection: test.a AS alias\
1036        \n  TableScan: test projection=[a]";
1037        assert_optimized_plan_equal(plan, expected)
1038    }
1039
1040    #[test]
1041    fn merge_nested_alias() -> Result<()> {
1042        let table_scan = test_table_scan()?;
1043        let plan = LogicalPlanBuilder::from(table_scan)
1044            .project(vec![col("a").alias("alias1").alias("alias2")])?
1045            .project(vec![col("alias2").alias("alias")])?
1046            .build()?;
1047
1048        let expected = "Projection: test.a AS alias\
1049        \n  TableScan: test projection=[a]";
1050        assert_optimized_plan_equal(plan, expected)
1051    }
1052
1053    #[test]
1054    fn test_nested_count() -> Result<()> {
1055        let schema = Schema::new(vec![Field::new("foo", DataType::Int32, false)]);
1056
1057        let groups: Vec<Expr> = vec![];
1058
1059        let plan = table_scan(TableReference::none(), &schema, None)
1060            .unwrap()
1061            .aggregate(groups.clone(), vec![count(lit(1))])
1062            .unwrap()
1063            .aggregate(groups, vec![count(lit(1))])
1064            .unwrap()
1065            .build()
1066            .unwrap();
1067
1068        let expected = "Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]\
1069        \n  Projection: \
1070        \n    Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]\
1071        \n      TableScan: ?table? projection=[]";
1072        assert_optimized_plan_equal(plan, expected)
1073    }
1074
1075    #[test]
1076    fn test_neg_push_down() -> Result<()> {
1077        let table_scan = test_table_scan()?;
1078        let plan = LogicalPlanBuilder::from(table_scan)
1079            .project(vec![-col("a")])?
1080            .build()?;
1081
1082        let expected = "Projection: (- test.a)\
1083        \n  TableScan: test projection=[a]";
1084        assert_optimized_plan_equal(plan, expected)
1085    }
1086
1087    #[test]
1088    fn test_is_null() -> Result<()> {
1089        let table_scan = test_table_scan()?;
1090        let plan = LogicalPlanBuilder::from(table_scan)
1091            .project(vec![col("a").is_null()])?
1092            .build()?;
1093
1094        let expected = "Projection: test.a IS NULL\
1095        \n  TableScan: test projection=[a]";
1096        assert_optimized_plan_equal(plan, expected)
1097    }
1098
1099    #[test]
1100    fn test_is_not_null() -> Result<()> {
1101        let table_scan = test_table_scan()?;
1102        let plan = LogicalPlanBuilder::from(table_scan)
1103            .project(vec![col("a").is_not_null()])?
1104            .build()?;
1105
1106        let expected = "Projection: test.a IS NOT NULL\
1107        \n  TableScan: test projection=[a]";
1108        assert_optimized_plan_equal(plan, expected)
1109    }
1110
1111    #[test]
1112    fn test_is_true() -> Result<()> {
1113        let table_scan = test_table_scan()?;
1114        let plan = LogicalPlanBuilder::from(table_scan)
1115            .project(vec![col("a").is_true()])?
1116            .build()?;
1117
1118        let expected = "Projection: test.a IS TRUE\
1119        \n  TableScan: test projection=[a]";
1120        assert_optimized_plan_equal(plan, expected)
1121    }
1122
1123    #[test]
1124    fn test_is_not_true() -> Result<()> {
1125        let table_scan = test_table_scan()?;
1126        let plan = LogicalPlanBuilder::from(table_scan)
1127            .project(vec![col("a").is_not_true()])?
1128            .build()?;
1129
1130        let expected = "Projection: test.a IS NOT TRUE\
1131        \n  TableScan: test projection=[a]";
1132        assert_optimized_plan_equal(plan, expected)
1133    }
1134
1135    #[test]
1136    fn test_is_false() -> Result<()> {
1137        let table_scan = test_table_scan()?;
1138        let plan = LogicalPlanBuilder::from(table_scan)
1139            .project(vec![col("a").is_false()])?
1140            .build()?;
1141
1142        let expected = "Projection: test.a IS FALSE\
1143        \n  TableScan: test projection=[a]";
1144        assert_optimized_plan_equal(plan, expected)
1145    }
1146
1147    #[test]
1148    fn test_is_not_false() -> Result<()> {
1149        let table_scan = test_table_scan()?;
1150        let plan = LogicalPlanBuilder::from(table_scan)
1151            .project(vec![col("a").is_not_false()])?
1152            .build()?;
1153
1154        let expected = "Projection: test.a IS NOT FALSE\
1155        \n  TableScan: test projection=[a]";
1156        assert_optimized_plan_equal(plan, expected)
1157    }
1158
1159    #[test]
1160    fn test_is_unknown() -> Result<()> {
1161        let table_scan = test_table_scan()?;
1162        let plan = LogicalPlanBuilder::from(table_scan)
1163            .project(vec![col("a").is_unknown()])?
1164            .build()?;
1165
1166        let expected = "Projection: test.a IS UNKNOWN\
1167        \n  TableScan: test projection=[a]";
1168        assert_optimized_plan_equal(plan, expected)
1169    }
1170
1171    #[test]
1172    fn test_is_not_unknown() -> Result<()> {
1173        let table_scan = test_table_scan()?;
1174        let plan = LogicalPlanBuilder::from(table_scan)
1175            .project(vec![col("a").is_not_unknown()])?
1176            .build()?;
1177
1178        let expected = "Projection: test.a IS NOT UNKNOWN\
1179        \n  TableScan: test projection=[a]";
1180        assert_optimized_plan_equal(plan, expected)
1181    }
1182
1183    #[test]
1184    fn test_not() -> Result<()> {
1185        let table_scan = test_table_scan()?;
1186        let plan = LogicalPlanBuilder::from(table_scan)
1187            .project(vec![not(col("a"))])?
1188            .build()?;
1189
1190        let expected = "Projection: NOT test.a\
1191        \n  TableScan: test projection=[a]";
1192        assert_optimized_plan_equal(plan, expected)
1193    }
1194
1195    #[test]
1196    fn test_try_cast() -> Result<()> {
1197        let table_scan = test_table_scan()?;
1198        let plan = LogicalPlanBuilder::from(table_scan)
1199            .project(vec![try_cast(col("a"), DataType::Float64)])?
1200            .build()?;
1201
1202        let expected = "Projection: TRY_CAST(test.a AS Float64)\
1203        \n  TableScan: test projection=[a]";
1204        assert_optimized_plan_equal(plan, expected)
1205    }
1206
1207    #[test]
1208    fn test_similar_to() -> Result<()> {
1209        let table_scan = test_table_scan()?;
1210        let expr = Box::new(col("a"));
1211        let pattern = Box::new(lit("[0-9]"));
1212        let similar_to_expr =
1213            Expr::SimilarTo(Like::new(false, expr, pattern, None, false));
1214        let plan = LogicalPlanBuilder::from(table_scan)
1215            .project(vec![similar_to_expr])?
1216            .build()?;
1217
1218        let expected = "Projection: test.a SIMILAR TO Utf8(\"[0-9]\")\
1219        \n  TableScan: test projection=[a]";
1220        assert_optimized_plan_equal(plan, expected)
1221    }
1222
1223    #[test]
1224    fn test_between() -> Result<()> {
1225        let table_scan = test_table_scan()?;
1226        let plan = LogicalPlanBuilder::from(table_scan)
1227            .project(vec![col("a").between(lit(1), lit(3))])?
1228            .build()?;
1229
1230        let expected = "Projection: test.a BETWEEN Int32(1) AND Int32(3)\
1231        \n  TableScan: test projection=[a]";
1232        assert_optimized_plan_equal(plan, expected)
1233    }
1234
1235    // Test Case expression
1236    #[test]
1237    fn test_case_merged() -> Result<()> {
1238        let table_scan = test_table_scan()?;
1239        let plan = LogicalPlanBuilder::from(table_scan)
1240            .project(vec![col("a"), lit(0).alias("d")])?
1241            .project(vec![
1242                col("a"),
1243                when(col("a").eq(lit(1)), lit(10))
1244                    .otherwise(col("d"))?
1245                    .alias("d"),
1246            ])?
1247            .build()?;
1248
1249        let expected = "Projection: test.a, CASE WHEN test.a = Int32(1) THEN Int32(10) ELSE Int32(0) END AS d\
1250        \n  TableScan: test projection=[a]";
1251        assert_optimized_plan_equal(plan, expected)
1252    }
1253
1254    // Test outer projection isn't discarded despite the same schema as inner
1255    // https://github.com/apache/datafusion/issues/8942
1256    #[test]
1257    fn test_derived_column() -> Result<()> {
1258        let table_scan = test_table_scan()?;
1259        let plan = LogicalPlanBuilder::from(table_scan)
1260            .project(vec![col("a").add(lit(1)).alias("a"), lit(0).alias("d")])?
1261            .project(vec![
1262                col("a"),
1263                when(col("a").eq(lit(1)), lit(10))
1264                    .otherwise(col("d"))?
1265                    .alias("d"),
1266            ])?
1267            .build()?;
1268
1269        let expected =
1270            "Projection: a, CASE WHEN a = Int32(1) THEN Int32(10) ELSE d END AS d\
1271        \n  Projection: test.a + Int32(1) AS a, Int32(0) AS d\
1272        \n    TableScan: test projection=[a]";
1273        assert_optimized_plan_equal(plan, expected)
1274    }
1275
1276    // Since only column `a` is referred at the output. Scan should only contain projection=[a].
1277    // User defined node should be able to propagate necessary expressions by its parent to its child.
1278    #[test]
1279    fn test_user_defined_logical_plan_node() -> Result<()> {
1280        let table_scan = test_table_scan()?;
1281        let custom_plan = LogicalPlan::Extension(Extension {
1282            node: Arc::new(NoOpUserDefined::new(
1283                Arc::clone(table_scan.schema()),
1284                Arc::new(table_scan.clone()),
1285            )),
1286        });
1287        let plan = LogicalPlanBuilder::from(custom_plan)
1288            .project(vec![col("a"), lit(0).alias("d")])?
1289            .build()?;
1290
1291        let expected = "Projection: test.a, Int32(0) AS d\
1292        \n  NoOpUserDefined\
1293        \n    TableScan: test projection=[a]";
1294        assert_optimized_plan_equal(plan, expected)
1295    }
1296
1297    // Only column `a` is referred at the output. However, User defined node itself uses column `b`
1298    // during its operation. Hence, scan should contain projection=[a, b].
1299    // User defined node should be able to propagate necessary expressions by its parent, as well as its own
1300    // required expressions.
1301    #[test]
1302    fn test_user_defined_logical_plan_node2() -> Result<()> {
1303        let table_scan = test_table_scan()?;
1304        let exprs = vec![Expr::Column(Column::from_qualified_name("b"))];
1305        let custom_plan = LogicalPlan::Extension(Extension {
1306            node: Arc::new(
1307                NoOpUserDefined::new(
1308                    Arc::clone(table_scan.schema()),
1309                    Arc::new(table_scan.clone()),
1310                )
1311                .with_exprs(exprs),
1312            ),
1313        });
1314        let plan = LogicalPlanBuilder::from(custom_plan)
1315            .project(vec![col("a"), lit(0).alias("d")])?
1316            .build()?;
1317
1318        let expected = "Projection: test.a, Int32(0) AS d\
1319        \n  NoOpUserDefined\
1320        \n    TableScan: test projection=[a, b]";
1321        assert_optimized_plan_equal(plan, expected)
1322    }
1323
1324    // Only column `a` is referred at the output. However, User defined node itself uses expression `b+c`
1325    // during its operation. Hence, scan should contain projection=[a, b, c].
1326    // User defined node should be able to propagate necessary expressions by its parent, as well as its own
1327    // required expressions. Expressions doesn't have to be just column. Requirements from complex expressions
1328    // should be propagated also.
1329    #[test]
1330    fn test_user_defined_logical_plan_node3() -> Result<()> {
1331        let table_scan = test_table_scan()?;
1332        let left_expr = Expr::Column(Column::from_qualified_name("b"));
1333        let right_expr = Expr::Column(Column::from_qualified_name("c"));
1334        let binary_expr = Expr::BinaryExpr(BinaryExpr::new(
1335            Box::new(left_expr),
1336            Operator::Plus,
1337            Box::new(right_expr),
1338        ));
1339        let exprs = vec![binary_expr];
1340        let custom_plan = LogicalPlan::Extension(Extension {
1341            node: Arc::new(
1342                NoOpUserDefined::new(
1343                    Arc::clone(table_scan.schema()),
1344                    Arc::new(table_scan.clone()),
1345                )
1346                .with_exprs(exprs),
1347            ),
1348        });
1349        let plan = LogicalPlanBuilder::from(custom_plan)
1350            .project(vec![col("a"), lit(0).alias("d")])?
1351            .build()?;
1352
1353        let expected = "Projection: test.a, Int32(0) AS d\
1354        \n  NoOpUserDefined\
1355        \n    TableScan: test projection=[a, b, c]";
1356        assert_optimized_plan_equal(plan, expected)
1357    }
1358
1359    // Columns `l.a`, `l.c`, `r.a` is referred at the output.
1360    // User defined node should be able to propagate necessary expressions by its parent, to its children.
1361    // Even if it has multiple children.
1362    // left child should have `projection=[a, c]`, and right side should have `projection=[a]`.
1363    #[test]
1364    fn test_user_defined_logical_plan_node4() -> Result<()> {
1365        let left_table = test_table_scan_with_name("l")?;
1366        let right_table = test_table_scan_with_name("r")?;
1367        let custom_plan = LogicalPlan::Extension(Extension {
1368            node: Arc::new(UserDefinedCrossJoin::new(
1369                Arc::new(left_table),
1370                Arc::new(right_table),
1371            )),
1372        });
1373        let plan = LogicalPlanBuilder::from(custom_plan)
1374            .project(vec![col("l.a"), col("l.c"), col("r.a"), lit(0).alias("d")])?
1375            .build()?;
1376
1377        let expected = "Projection: l.a, l.c, r.a, Int32(0) AS d\
1378        \n  UserDefinedCrossJoin\
1379        \n    TableScan: l projection=[a, c]\
1380        \n    TableScan: r projection=[a]";
1381        assert_optimized_plan_equal(plan, expected)
1382    }
1383
1384    #[test]
1385    fn aggregate_no_group_by() -> Result<()> {
1386        let table_scan = test_table_scan()?;
1387
1388        let plan = LogicalPlanBuilder::from(table_scan)
1389            .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1390            .build()?;
1391
1392        let expected = "Aggregate: groupBy=[[]], aggr=[[max(test.b)]]\
1393        \n  TableScan: test projection=[b]";
1394
1395        assert_optimized_plan_equal(plan, expected)
1396    }
1397
1398    #[test]
1399    fn aggregate_group_by() -> Result<()> {
1400        let table_scan = test_table_scan()?;
1401
1402        let plan = LogicalPlanBuilder::from(table_scan)
1403            .aggregate(vec![col("c")], vec![max(col("b"))])?
1404            .build()?;
1405
1406        let expected = "Aggregate: groupBy=[[test.c]], aggr=[[max(test.b)]]\
1407        \n  TableScan: test projection=[b, c]";
1408
1409        assert_optimized_plan_equal(plan, expected)
1410    }
1411
1412    #[test]
1413    fn aggregate_group_by_with_table_alias() -> Result<()> {
1414        let table_scan = test_table_scan()?;
1415
1416        let plan = LogicalPlanBuilder::from(table_scan)
1417            .alias("a")?
1418            .aggregate(vec![col("c")], vec![max(col("b"))])?
1419            .build()?;
1420
1421        let expected = "Aggregate: groupBy=[[a.c]], aggr=[[max(a.b)]]\
1422        \n  SubqueryAlias: a\
1423        \n    TableScan: test projection=[b, c]";
1424
1425        assert_optimized_plan_equal(plan, expected)
1426    }
1427
1428    #[test]
1429    fn aggregate_no_group_by_with_filter() -> Result<()> {
1430        let table_scan = test_table_scan()?;
1431
1432        let plan = LogicalPlanBuilder::from(table_scan)
1433            .filter(col("c").gt(lit(1)))?
1434            .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1435            .build()?;
1436
1437        let expected = "Aggregate: groupBy=[[]], aggr=[[max(test.b)]]\
1438        \n  Projection: test.b\
1439        \n    Filter: test.c > Int32(1)\
1440        \n      TableScan: test projection=[b, c]";
1441
1442        assert_optimized_plan_equal(plan, expected)
1443    }
1444
1445    #[test]
1446    fn aggregate_with_periods() -> Result<()> {
1447        let schema = Schema::new(vec![Field::new("tag.one", DataType::Utf8, false)]);
1448
1449        // Build a plan that looks as follows (note "tag.one" is a column named
1450        // "tag.one", not a column named "one" in a table named "tag"):
1451        //
1452        // Projection: tag.one
1453        //   Aggregate: groupBy=[], aggr=[max("tag.one") AS "tag.one"]
1454        //    TableScan
1455        let plan = table_scan(Some("m4"), &schema, None)?
1456            .aggregate(
1457                Vec::<Expr>::new(),
1458                vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
1459            )?
1460            .project([col(Column::new_unqualified("tag.one"))])?
1461            .build()?;
1462
1463        let expected = "\
1464        Aggregate: groupBy=[[]], aggr=[[max(m4.tag.one) AS tag.one]]\
1465        \n  TableScan: m4 projection=[tag.one]";
1466
1467        assert_optimized_plan_equal(plan, expected)
1468    }
1469
1470    #[test]
1471    fn redundant_project() -> Result<()> {
1472        let table_scan = test_table_scan()?;
1473
1474        let plan = LogicalPlanBuilder::from(table_scan)
1475            .project(vec![col("a"), col("b"), col("c")])?
1476            .project(vec![col("a"), col("c"), col("b")])?
1477            .build()?;
1478        let expected = "Projection: test.a, test.c, test.b\
1479        \n  TableScan: test projection=[a, b, c]";
1480
1481        assert_optimized_plan_equal(plan, expected)
1482    }
1483
1484    #[test]
1485    fn reorder_scan() -> Result<()> {
1486        let schema = Schema::new(test_table_scan_fields());
1487
1488        let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?.build()?;
1489        let expected = "TableScan: test projection=[b, a, c]";
1490
1491        assert_optimized_plan_equal(plan, expected)
1492    }
1493
1494    #[test]
1495    fn reorder_scan_projection() -> Result<()> {
1496        let schema = Schema::new(test_table_scan_fields());
1497
1498        let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
1499            .project(vec![col("a"), col("b")])?
1500            .build()?;
1501        let expected = "Projection: test.a, test.b\
1502        \n  TableScan: test projection=[b, a]";
1503
1504        assert_optimized_plan_equal(plan, expected)
1505    }
1506
1507    #[test]
1508    fn reorder_projection() -> Result<()> {
1509        let table_scan = test_table_scan()?;
1510
1511        let plan = LogicalPlanBuilder::from(table_scan)
1512            .project(vec![col("c"), col("b"), col("a")])?
1513            .build()?;
1514        let expected = "Projection: test.c, test.b, test.a\
1515        \n  TableScan: test projection=[a, b, c]";
1516
1517        assert_optimized_plan_equal(plan, expected)
1518    }
1519
1520    #[test]
1521    fn noncontinuous_redundant_projection() -> Result<()> {
1522        let table_scan = test_table_scan()?;
1523
1524        let plan = LogicalPlanBuilder::from(table_scan)
1525            .project(vec![col("c"), col("b"), col("a")])?
1526            .filter(col("c").gt(lit(1)))?
1527            .project(vec![col("c"), col("a"), col("b")])?
1528            .filter(col("b").gt(lit(1)))?
1529            .filter(col("a").gt(lit(1)))?
1530            .project(vec![col("a"), col("c"), col("b")])?
1531            .build()?;
1532        let expected = "Projection: test.a, test.c, test.b\
1533        \n  Filter: test.a > Int32(1)\
1534        \n    Filter: test.b > Int32(1)\
1535        \n      Projection: test.c, test.a, test.b\
1536        \n        Filter: test.c > Int32(1)\
1537        \n          Projection: test.c, test.b, test.a\
1538        \n            TableScan: test projection=[a, b, c]";
1539        assert_optimized_plan_equal(plan, expected)
1540    }
1541
1542    #[test]
1543    fn join_schema_trim_full_join_column_projection() -> Result<()> {
1544        let table_scan = test_table_scan()?;
1545
1546        let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1547        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1548
1549        let plan = LogicalPlanBuilder::from(table_scan)
1550            .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1551            .project(vec![col("a"), col("b"), col("c1")])?
1552            .build()?;
1553
1554        // make sure projections are pushed down to both table scans
1555        let expected = "Left Join: test.a = test2.c1\
1556        \n  TableScan: test projection=[a, b]\
1557        \n  TableScan: test2 projection=[c1]";
1558
1559        let optimized_plan = optimize(plan)?;
1560        let formatted_plan = format!("{optimized_plan}");
1561        assert_eq!(formatted_plan, expected);
1562
1563        // make sure schema for join node include both join columns
1564        let optimized_join = optimized_plan;
1565        assert_eq!(
1566            **optimized_join.schema(),
1567            DFSchema::new_with_metadata(
1568                vec![
1569                    (
1570                        Some("test".into()),
1571                        Arc::new(Field::new("a", DataType::UInt32, false))
1572                    ),
1573                    (
1574                        Some("test".into()),
1575                        Arc::new(Field::new("b", DataType::UInt32, false))
1576                    ),
1577                    (
1578                        Some("test2".into()),
1579                        Arc::new(Field::new("c1", DataType::UInt32, true))
1580                    ),
1581                ],
1582                HashMap::new()
1583            )?,
1584        );
1585
1586        Ok(())
1587    }
1588
1589    #[test]
1590    fn join_schema_trim_partial_join_column_projection() -> Result<()> {
1591        // test join column push down without explicit column projections
1592
1593        let table_scan = test_table_scan()?;
1594
1595        let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1596        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1597
1598        let plan = LogicalPlanBuilder::from(table_scan)
1599            .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1600            // projecting joined column `a` should push the right side column `c1` projection as
1601            // well into test2 table even though `c1` is not referenced in projection.
1602            .project(vec![col("a"), col("b")])?
1603            .build()?;
1604
1605        // make sure projections are pushed down to both table scans
1606        let expected = "Projection: test.a, test.b\
1607        \n  Left Join: test.a = test2.c1\
1608        \n    TableScan: test projection=[a, b]\
1609        \n    TableScan: test2 projection=[c1]";
1610
1611        let optimized_plan = optimize(plan)?;
1612        let formatted_plan = format!("{optimized_plan}");
1613        assert_eq!(formatted_plan, expected);
1614
1615        // make sure schema for join node include both join columns
1616        let optimized_join = optimized_plan.inputs()[0];
1617        assert_eq!(
1618            **optimized_join.schema(),
1619            DFSchema::new_with_metadata(
1620                vec![
1621                    (
1622                        Some("test".into()),
1623                        Arc::new(Field::new("a", DataType::UInt32, false))
1624                    ),
1625                    (
1626                        Some("test".into()),
1627                        Arc::new(Field::new("b", DataType::UInt32, false))
1628                    ),
1629                    (
1630                        Some("test2".into()),
1631                        Arc::new(Field::new("c1", DataType::UInt32, true))
1632                    ),
1633                ],
1634                HashMap::new()
1635            )?,
1636        );
1637
1638        Ok(())
1639    }
1640
1641    #[test]
1642    fn join_schema_trim_using_join() -> Result<()> {
1643        // shared join columns from using join should be pushed to both sides
1644
1645        let table_scan = test_table_scan()?;
1646
1647        let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
1648        let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1649
1650        let plan = LogicalPlanBuilder::from(table_scan)
1651            .join_using(table2_scan, JoinType::Left, vec!["a"])?
1652            .project(vec![col("a"), col("b")])?
1653            .build()?;
1654
1655        // make sure projections are pushed down to table scan
1656        let expected = "Projection: test.a, test.b\
1657        \n  Left Join: Using test.a = test2.a\
1658        \n    TableScan: test projection=[a, b]\
1659        \n    TableScan: test2 projection=[a]";
1660
1661        let optimized_plan = optimize(plan)?;
1662        let formatted_plan = format!("{optimized_plan}");
1663        assert_eq!(formatted_plan, expected);
1664
1665        // make sure schema for join node include both join columns
1666        let optimized_join = optimized_plan.inputs()[0];
1667        assert_eq!(
1668            **optimized_join.schema(),
1669            DFSchema::new_with_metadata(
1670                vec![
1671                    (
1672                        Some("test".into()),
1673                        Arc::new(Field::new("a", DataType::UInt32, false))
1674                    ),
1675                    (
1676                        Some("test".into()),
1677                        Arc::new(Field::new("b", DataType::UInt32, false))
1678                    ),
1679                    (
1680                        Some("test2".into()),
1681                        Arc::new(Field::new("a", DataType::UInt32, true))
1682                    ),
1683                ],
1684                HashMap::new()
1685            )?,
1686        );
1687
1688        Ok(())
1689    }
1690
1691    #[test]
1692    fn cast() -> Result<()> {
1693        let table_scan = test_table_scan()?;
1694
1695        let projection = LogicalPlanBuilder::from(table_scan)
1696            .project(vec![Expr::Cast(Cast::new(
1697                Box::new(col("c")),
1698                DataType::Float64,
1699            ))])?
1700            .build()?;
1701
1702        let expected = "Projection: CAST(test.c AS Float64)\
1703        \n  TableScan: test projection=[c]";
1704
1705        assert_optimized_plan_equal(projection, expected)
1706    }
1707
1708    #[test]
1709    fn table_scan_projected_schema() -> Result<()> {
1710        let table_scan = test_table_scan()?;
1711        let plan = LogicalPlanBuilder::from(test_table_scan()?)
1712            .project(vec![col("a"), col("b")])?
1713            .build()?;
1714
1715        assert_eq!(3, table_scan.schema().fields().len());
1716        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1717        assert_fields_eq(&plan, vec!["a", "b"]);
1718
1719        let expected = "TableScan: test projection=[a, b]";
1720
1721        assert_optimized_plan_equal(plan, expected)
1722    }
1723
1724    #[test]
1725    fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
1726        let table_scan = test_table_scan()?;
1727        let input_schema = table_scan.schema();
1728        assert_eq!(3, input_schema.fields().len());
1729        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1730
1731        // Build the LogicalPlan directly (don't use PlanBuilder), so
1732        // that the Column references are unqualified (e.g. their
1733        // relation is `None`). PlanBuilder resolves the expressions
1734        let expr = vec![col("test.a"), col("test.b")];
1735        let plan =
1736            LogicalPlan::Projection(Projection::try_new(expr, Arc::new(table_scan))?);
1737
1738        assert_fields_eq(&plan, vec!["a", "b"]);
1739
1740        let expected = "TableScan: test projection=[a, b]";
1741
1742        assert_optimized_plan_equal(plan, expected)
1743    }
1744
1745    #[test]
1746    fn table_limit() -> Result<()> {
1747        let table_scan = test_table_scan()?;
1748        assert_eq!(3, table_scan.schema().fields().len());
1749        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1750
1751        let plan = LogicalPlanBuilder::from(table_scan)
1752            .project(vec![col("c"), col("a")])?
1753            .limit(0, Some(5))?
1754            .build()?;
1755
1756        assert_fields_eq(&plan, vec!["c", "a"]);
1757
1758        let expected = "Limit: skip=0, fetch=5\
1759        \n  Projection: test.c, test.a\
1760        \n    TableScan: test projection=[a, c]";
1761
1762        assert_optimized_plan_equal(plan, expected)
1763    }
1764
1765    #[test]
1766    fn table_scan_without_projection() -> Result<()> {
1767        let table_scan = test_table_scan()?;
1768        let plan = LogicalPlanBuilder::from(table_scan).build()?;
1769        // should expand projection to all columns without projection
1770        let expected = "TableScan: test projection=[a, b, c]";
1771        assert_optimized_plan_equal(plan, expected)
1772    }
1773
1774    #[test]
1775    fn table_scan_with_literal_projection() -> Result<()> {
1776        let table_scan = test_table_scan()?;
1777        let plan = LogicalPlanBuilder::from(table_scan)
1778            .project(vec![lit(1_i64), lit(2_i64)])?
1779            .build()?;
1780        let expected = "Projection: Int64(1), Int64(2)\
1781                      \n  TableScan: test projection=[]";
1782        assert_optimized_plan_equal(plan, expected)
1783    }
1784
1785    /// tests that it removes unused columns in projections
1786    #[test]
1787    fn table_unused_column() -> Result<()> {
1788        let table_scan = test_table_scan()?;
1789        assert_eq!(3, table_scan.schema().fields().len());
1790        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1791
1792        // we never use "b" in the first projection => remove it
1793        let plan = LogicalPlanBuilder::from(table_scan)
1794            .project(vec![col("c"), col("a"), col("b")])?
1795            .filter(col("c").gt(lit(1)))?
1796            .aggregate(vec![col("c")], vec![max(col("a"))])?
1797            .build()?;
1798
1799        assert_fields_eq(&plan, vec!["c", "max(test.a)"]);
1800
1801        let plan = optimize(plan).expect("failed to optimize plan");
1802        let expected = "\
1803        Aggregate: groupBy=[[test.c]], aggr=[[max(test.a)]]\
1804        \n  Filter: test.c > Int32(1)\
1805        \n    Projection: test.c, test.a\
1806        \n      TableScan: test projection=[a, c]";
1807
1808        assert_optimized_plan_equal(plan, expected)
1809    }
1810
1811    /// tests that it removes un-needed projections
1812    #[test]
1813    fn table_unused_projection() -> Result<()> {
1814        let table_scan = test_table_scan()?;
1815        assert_eq!(3, table_scan.schema().fields().len());
1816        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1817
1818        // there is no need for the first projection
1819        let plan = LogicalPlanBuilder::from(table_scan)
1820            .project(vec![col("b")])?
1821            .project(vec![lit(1).alias("a")])?
1822            .build()?;
1823
1824        assert_fields_eq(&plan, vec!["a"]);
1825
1826        let expected = "\
1827        Projection: Int32(1) AS a\
1828        \n  TableScan: test projection=[]";
1829
1830        assert_optimized_plan_equal(plan, expected)
1831    }
1832
1833    #[test]
1834    fn table_full_filter_pushdown() -> Result<()> {
1835        let schema = Schema::new(test_table_scan_fields());
1836
1837        let table_scan = table_scan_with_filters(
1838            Some("test"),
1839            &schema,
1840            None,
1841            vec![col("b").eq(lit(1))],
1842        )?
1843        .build()?;
1844        assert_eq!(3, table_scan.schema().fields().len());
1845        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1846
1847        // there is no need for the first projection
1848        let plan = LogicalPlanBuilder::from(table_scan)
1849            .project(vec![col("b")])?
1850            .project(vec![lit(1).alias("a")])?
1851            .build()?;
1852
1853        assert_fields_eq(&plan, vec!["a"]);
1854
1855        let expected = "\
1856        Projection: Int32(1) AS a\
1857        \n  TableScan: test projection=[], full_filters=[b = Int32(1)]";
1858
1859        assert_optimized_plan_equal(plan, expected)
1860    }
1861
1862    /// tests that optimizing twice yields same plan
1863    #[test]
1864    fn test_double_optimization() -> Result<()> {
1865        let table_scan = test_table_scan()?;
1866
1867        let plan = LogicalPlanBuilder::from(table_scan)
1868            .project(vec![col("b")])?
1869            .project(vec![lit(1).alias("a")])?
1870            .build()?;
1871
1872        let optimized_plan1 = optimize(plan).expect("failed to optimize plan");
1873        let optimized_plan2 =
1874            optimize(optimized_plan1.clone()).expect("failed to optimize plan");
1875
1876        let formatted_plan1 = format!("{optimized_plan1:?}");
1877        let formatted_plan2 = format!("{optimized_plan2:?}");
1878        assert_eq!(formatted_plan1, formatted_plan2);
1879        Ok(())
1880    }
1881
1882    /// tests that it removes an aggregate is never used downstream
1883    #[test]
1884    fn table_unused_aggregate() -> Result<()> {
1885        let table_scan = test_table_scan()?;
1886        assert_eq!(3, table_scan.schema().fields().len());
1887        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1888
1889        // we never use "min(b)" => remove it
1890        let plan = LogicalPlanBuilder::from(table_scan)
1891            .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
1892            .filter(col("c").gt(lit(1)))?
1893            .project(vec![col("c"), col("a"), col("max(test.b)")])?
1894            .build()?;
1895
1896        assert_fields_eq(&plan, vec!["c", "a", "max(test.b)"]);
1897
1898        let expected = "Projection: test.c, test.a, max(test.b)\
1899        \n  Filter: test.c > Int32(1)\
1900        \n    Aggregate: groupBy=[[test.a, test.c]], aggr=[[max(test.b)]]\
1901        \n      TableScan: test projection=[a, b, c]";
1902
1903        assert_optimized_plan_equal(plan, expected)
1904    }
1905
1906    #[test]
1907    fn aggregate_filter_pushdown() -> Result<()> {
1908        let table_scan = test_table_scan()?;
1909        let aggr_with_filter = count_udaf()
1910            .call(vec![col("b")])
1911            .filter(col("c").gt(lit(42)))
1912            .build()?;
1913        let plan = LogicalPlanBuilder::from(table_scan)
1914            .aggregate(
1915                vec![col("a")],
1916                vec![count(col("b")), aggr_with_filter.alias("count2")],
1917            )?
1918            .build()?;
1919
1920        let expected = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
1921        \n  TableScan: test projection=[a, b, c]";
1922
1923        assert_optimized_plan_equal(plan, expected)
1924    }
1925
1926    #[test]
1927    fn pushdown_through_distinct() -> Result<()> {
1928        let table_scan = test_table_scan()?;
1929
1930        let plan = LogicalPlanBuilder::from(table_scan)
1931            .project(vec![col("a"), col("b")])?
1932            .distinct()?
1933            .project(vec![col("a")])?
1934            .build()?;
1935
1936        let expected = "Projection: test.a\
1937        \n  Distinct:\
1938        \n    TableScan: test projection=[a, b]";
1939
1940        assert_optimized_plan_equal(plan, expected)
1941    }
1942
1943    #[test]
1944    fn test_window() -> Result<()> {
1945        let table_scan = test_table_scan()?;
1946
1947        let max1 = Expr::WindowFunction(expr::WindowFunction::new(
1948            WindowFunctionDefinition::AggregateUDF(max_udaf()),
1949            vec![col("test.a")],
1950        ))
1951        .partition_by(vec![col("test.b")])
1952        .build()
1953        .unwrap();
1954
1955        let max2 = Expr::WindowFunction(expr::WindowFunction::new(
1956            WindowFunctionDefinition::AggregateUDF(max_udaf()),
1957            vec![col("test.b")],
1958        ));
1959        let col1 = col(max1.schema_name().to_string());
1960        let col2 = col(max2.schema_name().to_string());
1961
1962        let plan = LogicalPlanBuilder::from(table_scan)
1963            .window(vec![max1])?
1964            .window(vec![max2])?
1965            .project(vec![col1, col2])?
1966            .build()?;
1967
1968        let expected = "Projection: max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
1969        \n  WindowAggr: windowExpr=[[max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
1970        \n    Projection: test.b, max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
1971        \n      WindowAggr: windowExpr=[[max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
1972        \n        TableScan: test projection=[a, b]";
1973
1974        assert_optimized_plan_equal(plan, expected)
1975    }
1976
1977    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
1978
1979    fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
1980        let optimizer = Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]);
1981        let optimized_plan =
1982            optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
1983        Ok(optimized_plan)
1984    }
1985}