datafusion_optimizer/
optimizer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Optimizer`] and [`OptimizerRule`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{assert_expected_schema, InvariantLevel};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
37use crate::eliminate_cross_join::EliminateCrossJoin;
38use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
39use crate::eliminate_filter::EliminateFilter;
40use crate::eliminate_group_by_constant::EliminateGroupByConstant;
41use crate::eliminate_join::EliminateJoin;
42use crate::eliminate_limit::EliminateLimit;
43use crate::eliminate_nested_union::EliminateNestedUnion;
44use crate::eliminate_one_union::EliminateOneUnion;
45use crate::eliminate_outer_join::EliminateOuterJoin;
46use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
47use crate::filter_null_join_keys::FilterNullJoinKeys;
48use crate::optimize_projections::OptimizeProjections;
49use crate::plan_signature::LogicalPlanSignature;
50use crate::propagate_empty_relation::PropagateEmptyRelation;
51use crate::push_down_filter::PushDownFilter;
52use crate::push_down_limit::PushDownLimit;
53use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
54use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
55use crate::simplify_expressions::SimplifyExpressions;
56use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
57use crate::utils::log_plan;
58
59/// `OptimizerRule`s transforms one [`LogicalPlan`] into another which
60/// computes the same results, but in a potentially more efficient
61/// way. If there are no suitable transformations for the input plan,
62/// the optimizer should simply return it unmodified.
63///
64/// To change the semantics of a `LogicalPlan`, see [`AnalyzerRule`]
65///
66/// Use [`SessionState::add_optimizer_rule`] to register additional
67/// `OptimizerRule`s.
68///
69/// [`AnalyzerRule`]: crate::analyzer::AnalyzerRule
70/// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_optimizer_rule
71pub trait OptimizerRule: Debug {
72    /// A human readable name for this optimizer rule
73    fn name(&self) -> &str;
74
75    /// How should the rule be applied by the optimizer? See comments on
76    /// [`ApplyOrder`] for details.
77    ///
78    /// If returns `None`, the default, the rule must handle recursion itself
79    fn apply_order(&self) -> Option<ApplyOrder> {
80        None
81    }
82
83    /// Does this rule support rewriting owned plans (rather than by reference)?
84    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
85    fn supports_rewrite(&self) -> bool {
86        true
87    }
88
89    /// Try to rewrite `plan` to an optimized form, returning `Transformed::yes`
90    /// if the plan was rewritten and `Transformed::no` if it was not.
91    fn rewrite(
92        &self,
93        _plan: LogicalPlan,
94        _config: &dyn OptimizerConfig,
95    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
96        internal_err!("rewrite is not implemented for {}", self.name())
97    }
98}
99
100/// Options to control the DataFusion Optimizer.
101pub trait OptimizerConfig {
102    /// Return the time at which the query execution started. This
103    /// time is used as the value for now()
104    fn query_execution_start_time(&self) -> DateTime<Utc>;
105
106    /// Return alias generator used to generate unique aliases for subqueries
107    fn alias_generator(&self) -> &Arc<AliasGenerator>;
108
109    fn options(&self) -> &ConfigOptions;
110
111    fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
112        None
113    }
114}
115
116/// A standalone [`OptimizerConfig`] that can be used independently
117/// of DataFusion's config management
118#[derive(Debug)]
119pub struct OptimizerContext {
120    /// Query execution start time that can be used to rewrite
121    /// expressions such as `now()` to use a literal value instead
122    query_execution_start_time: DateTime<Utc>,
123
124    /// Alias generator used to generate unique aliases for subqueries
125    alias_generator: Arc<AliasGenerator>,
126
127    options: ConfigOptions,
128}
129
130impl OptimizerContext {
131    /// Create optimizer config
132    pub fn new() -> Self {
133        let mut options = ConfigOptions::default();
134        options.optimizer.filter_null_join_keys = true;
135
136        Self {
137            query_execution_start_time: Utc::now(),
138            alias_generator: Arc::new(AliasGenerator::new()),
139            options,
140        }
141    }
142
143    /// Specify whether to enable the filter_null_keys rule
144    pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
145        self.options.optimizer.filter_null_join_keys = filter_null_keys;
146        self
147    }
148
149    /// Specify whether the optimizer should skip rules that produce
150    /// errors, or fail the query
151    pub fn with_query_execution_start_time(
152        mut self,
153        query_execution_tart_time: DateTime<Utc>,
154    ) -> Self {
155        self.query_execution_start_time = query_execution_tart_time;
156        self
157    }
158
159    /// Specify whether the optimizer should skip rules that produce
160    /// errors, or fail the query
161    pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
162        self.options.optimizer.skip_failed_rules = b;
163        self
164    }
165
166    /// Specify how many times to attempt to optimize the plan
167    pub fn with_max_passes(mut self, v: u8) -> Self {
168        self.options.optimizer.max_passes = v as usize;
169        self
170    }
171}
172
173impl Default for OptimizerContext {
174    /// Create optimizer config
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180impl OptimizerConfig for OptimizerContext {
181    fn query_execution_start_time(&self) -> DateTime<Utc> {
182        self.query_execution_start_time
183    }
184
185    fn alias_generator(&self) -> &Arc<AliasGenerator> {
186        &self.alias_generator
187    }
188
189    fn options(&self) -> &ConfigOptions {
190        &self.options
191    }
192}
193
194/// A rule-based optimizer.
195#[derive(Clone, Debug)]
196pub struct Optimizer {
197    /// All optimizer rules to apply
198    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
199}
200
201/// Specifies how recursion for an `OptimizerRule` should be handled.
202///
203/// * `Some(apply_order)`: The Optimizer will recursively apply the rule to the plan.
204/// * `None`: the rule must handle any required recursion itself.
205#[derive(Debug, Clone, Copy, PartialEq)]
206pub enum ApplyOrder {
207    /// Apply the rule to the node before its inputs
208    TopDown,
209    /// Apply the rule to the node after its inputs
210    BottomUp,
211}
212
213impl Default for Optimizer {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219impl Optimizer {
220    /// Create a new optimizer using the recommended list of rules
221    pub fn new() -> Self {
222        let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
223            Arc::new(EliminateNestedUnion::new()),
224            Arc::new(SimplifyExpressions::new()),
225            Arc::new(ReplaceDistinctWithAggregate::new()),
226            Arc::new(EliminateJoin::new()),
227            Arc::new(DecorrelatePredicateSubquery::new()),
228            Arc::new(ScalarSubqueryToJoin::new()),
229            Arc::new(ExtractEquijoinPredicate::new()),
230            Arc::new(EliminateDuplicatedExpr::new()),
231            Arc::new(EliminateFilter::new()),
232            Arc::new(EliminateCrossJoin::new()),
233            Arc::new(EliminateLimit::new()),
234            Arc::new(PropagateEmptyRelation::new()),
235            // Must be after PropagateEmptyRelation
236            Arc::new(EliminateOneUnion::new()),
237            Arc::new(FilterNullJoinKeys::default()),
238            Arc::new(EliminateOuterJoin::new()),
239            // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
240            Arc::new(PushDownLimit::new()),
241            Arc::new(PushDownFilter::new()),
242            Arc::new(SingleDistinctToGroupBy::new()),
243            // The previous optimizations added expressions and projections,
244            // that might benefit from the following rules
245            Arc::new(EliminateGroupByConstant::new()),
246            Arc::new(CommonSubexprEliminate::new()),
247            Arc::new(OptimizeProjections::new()),
248        ];
249
250        Self::with_rules(rules)
251    }
252
253    /// Create a new optimizer with the given rules
254    pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
255        Self { rules }
256    }
257}
258
259/// Recursively rewrites LogicalPlans
260struct Rewriter<'a> {
261    apply_order: ApplyOrder,
262    rule: &'a dyn OptimizerRule,
263    config: &'a dyn OptimizerConfig,
264}
265
266impl<'a> Rewriter<'a> {
267    fn new(
268        apply_order: ApplyOrder,
269        rule: &'a dyn OptimizerRule,
270        config: &'a dyn OptimizerConfig,
271    ) -> Self {
272        Self {
273            apply_order,
274            rule,
275            config,
276        }
277    }
278}
279
280impl TreeNodeRewriter for Rewriter<'_> {
281    type Node = LogicalPlan;
282
283    fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
284        if self.apply_order == ApplyOrder::TopDown {
285            {
286                self.rule.rewrite(node, self.config)
287            }
288        } else {
289            Ok(Transformed::no(node))
290        }
291    }
292
293    fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
294        if self.apply_order == ApplyOrder::BottomUp {
295            {
296                self.rule.rewrite(node, self.config)
297            }
298        } else {
299            Ok(Transformed::no(node))
300        }
301    }
302}
303
304impl Optimizer {
305    /// Optimizes the logical plan by applying optimizer rules, and
306    /// invoking observer function after each call
307    pub fn optimize<F>(
308        &self,
309        plan: LogicalPlan,
310        config: &dyn OptimizerConfig,
311        mut observer: F,
312    ) -> Result<LogicalPlan>
313    where
314        F: FnMut(&LogicalPlan, &dyn OptimizerRule),
315    {
316        // verify LP is valid, before the first LP optimizer pass.
317        plan.check_invariants(InvariantLevel::Executable)
318            .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
319
320        let start_time = Instant::now();
321        let options = config.options();
322        let mut new_plan = plan;
323
324        let mut previous_plans = HashSet::with_capacity(16);
325        previous_plans.insert(LogicalPlanSignature::new(&new_plan));
326
327        let starting_schema = Arc::clone(new_plan.schema());
328
329        let mut i = 0;
330        while i < options.optimizer.max_passes {
331            log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
332
333            for rule in &self.rules {
334                // If skipping failed rules, copy plan before attempting to rewrite
335                // as rewriting is destructive
336                let prev_plan = options
337                    .optimizer
338                    .skip_failed_rules
339                    .then(|| new_plan.clone());
340
341                let starting_schema = Arc::clone(new_plan.schema());
342
343                let result = match rule.apply_order() {
344                    // optimizer handles recursion
345                    Some(apply_order) => new_plan.rewrite_with_subqueries(
346                        &mut Rewriter::new(apply_order, rule.as_ref(), config),
347                    ),
348                    // rule handles recursion itself
349                    None => {
350                        rule.rewrite(new_plan, config)
351                    },
352                }
353                .and_then(|tnr| {
354                    // run checks optimizer invariant checks, per optimizer rule applied
355                    assert_valid_optimization(&tnr.data, &starting_schema)
356                        .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
357
358                    // run LP invariant checks only in debug mode for performance reasons
359                    #[cfg(debug_assertions)]
360                    tnr.data.check_invariants(InvariantLevel::Executable)
361                        .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
362
363                    Ok(tnr)
364                });
365
366                // Handle results
367                match (result, prev_plan) {
368                    // OptimizerRule was successful
369                    (
370                        Ok(Transformed {
371                            data, transformed, ..
372                        }),
373                        _,
374                    ) => {
375                        new_plan = data;
376                        observer(&new_plan, rule.as_ref());
377                        if transformed {
378                            log_plan(rule.name(), &new_plan);
379                        } else {
380                            debug!(
381                                "Plan unchanged by optimizer rule '{}' (pass {})",
382                                rule.name(),
383                                i
384                            );
385                        }
386                    }
387                    // OptimizerRule was unsuccessful, but skipped failed rules is on
388                    // so use the previous plan
389                    (Err(e), Some(orig_plan)) => {
390                        // Note to future readers: if you see this warning it signals a
391                        // bug in the DataFusion optimizer. Please consider filing a ticket
392                        // https://github.com/apache/datafusion
393                        warn!(
394                            "Skipping optimizer rule '{}' due to unexpected error: {}",
395                            rule.name(),
396                            e
397                        );
398                        new_plan = orig_plan;
399                    }
400                    // OptimizerRule was unsuccessful, but skipped failed rules is off, return error
401                    (Err(e), None) => {
402                        return Err(e.context(format!(
403                            "Optimizer rule '{}' failed",
404                            rule.name()
405                        )));
406                    }
407                }
408            }
409            log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
410
411            // HashSet::insert returns, whether the value was newly inserted.
412            let plan_is_fresh =
413                previous_plans.insert(LogicalPlanSignature::new(&new_plan));
414            if !plan_is_fresh {
415                // plan did not change, so no need to continue trying to optimize
416                debug!("optimizer pass {} did not make changes", i);
417                break;
418            }
419            i += 1;
420        }
421
422        // verify that the optimizer passes only mutated what was permitted.
423        assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
424            e.context("Check optimizer-specific invariants after all passes")
425        })?;
426
427        // verify LP is valid, after the last optimizer pass.
428        new_plan
429            .check_invariants(InvariantLevel::Executable)
430            .map_err(|e| {
431                e.context("Invalid (non-executable) plan after LP Optimizers")
432            })?;
433
434        log_plan("Final optimized plan", &new_plan);
435        debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
436        Ok(new_plan)
437    }
438}
439
440/// These are invariants which should hold true before and after [`LogicalPlan`] optimization.
441///
442/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular
443/// LogicalPlan is valid. Instead this address if the optimization was valid based upon permitted changes.
444fn assert_valid_optimization(
445    plan: &LogicalPlan,
446    prev_schema: &Arc<DFSchema>,
447) -> Result<()> {
448    // verify invariant: optimizer passes should not change the schema if the schema can't be cast from the previous schema.
449    // Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
450    assert_expected_schema(prev_schema, plan)?;
451
452    Ok(())
453}
454
455#[cfg(test)]
456mod tests {
457    use std::sync::{Arc, Mutex};
458
459    use datafusion_common::tree_node::Transformed;
460    use datafusion_common::{
461        assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
462    };
463    use datafusion_expr::logical_plan::EmptyRelation;
464    use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
465
466    use crate::optimizer::Optimizer;
467    use crate::test::test_table_scan;
468    use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
469
470    use super::ApplyOrder;
471
472    #[test]
473    fn skip_failing_rule() {
474        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
475        let config = OptimizerContext::new().with_skip_failing_rules(true);
476        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
477            produce_one_row: false,
478            schema: Arc::new(DFSchema::empty()),
479        });
480        opt.optimize(plan, &config, &observe).unwrap();
481    }
482
483    #[test]
484    fn no_skip_failing_rule() {
485        let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
486        let config = OptimizerContext::new().with_skip_failing_rules(false);
487        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
488            produce_one_row: false,
489            schema: Arc::new(DFSchema::empty()),
490        });
491        let err = opt.optimize(plan, &config, &observe).unwrap_err();
492        assert_eq!(
493            "Optimizer rule 'bad rule' failed\ncaused by\n\
494            Error during planning: rule failed",
495            err.strip_backtrace()
496        );
497    }
498
499    #[test]
500    fn generate_different_schema() {
501        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
502        let config = OptimizerContext::new().with_skip_failing_rules(false);
503        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
504            produce_one_row: false,
505            schema: Arc::new(DFSchema::empty()),
506        });
507        let err = opt.optimize(plan, &config, &observe).unwrap_err();
508
509        // Simplify assert to check the error message contains the expected message
510        assert_contains!(
511            err.strip_backtrace(),
512            "Failed due to a difference in schemas: original schema: DFSchema"
513        );
514    }
515
516    #[test]
517    fn skip_generate_different_schema() {
518        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
519        let config = OptimizerContext::new().with_skip_failing_rules(true);
520        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
521            produce_one_row: false,
522            schema: Arc::new(DFSchema::empty()),
523        });
524        opt.optimize(plan, &config, &observe).unwrap();
525    }
526
527    #[test]
528    fn generate_same_schema_different_metadata() -> Result<()> {
529        // if the plan creates more metadata than previously (because
530        // some wrapping functions are removed, etc) do not error
531        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
532        let config = OptimizerContext::new().with_skip_failing_rules(false);
533
534        let input = Arc::new(test_table_scan()?);
535        let input_schema = Arc::clone(input.schema());
536
537        let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
538            vec![col("a"), col("b"), col("c")],
539            input,
540            add_metadata_to_fields(input_schema.as_ref()),
541        )?);
542
543        // optimizing should be ok, but the schema will have changed  (no metadata)
544        assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
545        let optimized_plan = opt.optimize(plan, &config, &observe)?;
546        // metadata was removed
547        assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
548        Ok(())
549    }
550
551    #[test]
552    fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
553        // Run a goofy optimizer, which rotates projection columns
554        // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]
555
556        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
557        let config = OptimizerContext::new().with_max_passes(16);
558
559        let initial_plan = LogicalPlanBuilder::empty(false)
560            .project([lit(1), lit(2), lit(3)])?
561            .project([lit(100)])? // to not trigger changed schema error
562            .build()?;
563
564        let mut plans: Vec<LogicalPlan> = Vec::new();
565        let final_plan =
566            opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
567
568        // initial_plan is not observed, so we have 3 plans
569        assert_eq!(3, plans.len());
570
571        // we got again the initial_plan with [1, 2, 3]
572        assert_eq!(initial_plan, final_plan);
573
574        Ok(())
575    }
576
577    #[test]
578    fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
579        // Run a goofy optimizer, which reverses and rotates projection columns
580        // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]
581
582        let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
583        let config = OptimizerContext::new().with_max_passes(16);
584
585        let initial_plan = LogicalPlanBuilder::empty(false)
586            .project([lit(1), lit(2), lit(3)])?
587            .project([lit(100)])? // to not trigger changed schema error
588            .build()?;
589
590        let mut plans: Vec<LogicalPlan> = Vec::new();
591        let final_plan =
592            opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
593
594        // initial_plan is not observed, so we have 4 plans
595        assert_eq!(4, plans.len());
596
597        // we got again the plan with [3, 2, 1]
598        assert_eq!(plans[0], final_plan);
599
600        Ok(())
601    }
602
603    fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
604        let new_fields = schema
605            .iter()
606            .enumerate()
607            .map(|(i, (qualifier, field))| {
608                let metadata =
609                    [("key".into(), format!("value {i}"))].into_iter().collect();
610
611                let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
612                (qualifier.cloned(), Arc::new(new_arrow_field))
613            })
614            .collect::<Vec<_>>();
615
616        let new_metadata = schema.metadata().clone();
617        Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
618    }
619
620    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
621
622    #[derive(Default, Debug)]
623    struct BadRule {}
624
625    impl OptimizerRule for BadRule {
626        fn name(&self) -> &str {
627            "bad rule"
628        }
629
630        fn supports_rewrite(&self) -> bool {
631            true
632        }
633
634        fn rewrite(
635            &self,
636            _plan: LogicalPlan,
637            _config: &dyn OptimizerConfig,
638        ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
639            plan_err!("rule failed")
640        }
641    }
642
643    /// Replaces whatever plan with a single table scan
644    #[derive(Default, Debug)]
645    struct GetTableScanRule {}
646
647    impl OptimizerRule for GetTableScanRule {
648        fn name(&self) -> &str {
649            "get table_scan rule"
650        }
651
652        fn supports_rewrite(&self) -> bool {
653            true
654        }
655
656        fn rewrite(
657            &self,
658            _plan: LogicalPlan,
659            _config: &dyn OptimizerConfig,
660        ) -> Result<Transformed<LogicalPlan>> {
661            let table_scan = test_table_scan()?;
662            Ok(Transformed::yes(
663                LogicalPlanBuilder::from(table_scan).build()?,
664            ))
665        }
666    }
667
668    /// A goofy rule doing rotation of columns in all projections.
669    ///
670    /// Useful to test cycle detection.
671    #[derive(Default, Debug)]
672    struct RotateProjectionRule {
673        // reverse exprs instead of rotating on the first pass
674        reverse_on_first_pass: Mutex<bool>,
675    }
676
677    impl RotateProjectionRule {
678        fn new(reverse_on_first_pass: bool) -> Self {
679            Self {
680                reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
681            }
682        }
683    }
684
685    impl OptimizerRule for RotateProjectionRule {
686        fn name(&self) -> &str {
687            "rotate_projection"
688        }
689
690        fn apply_order(&self) -> Option<ApplyOrder> {
691            Some(ApplyOrder::TopDown)
692        }
693
694        fn supports_rewrite(&self) -> bool {
695            true
696        }
697
698        fn rewrite(
699            &self,
700            plan: LogicalPlan,
701            _config: &dyn OptimizerConfig,
702        ) -> Result<Transformed<LogicalPlan>> {
703            let projection = match plan {
704                LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
705                _ => return Ok(Transformed::no(plan)),
706            };
707
708            let mut exprs = projection.expr.clone();
709
710            let mut reverse = self.reverse_on_first_pass.lock().unwrap();
711            if *reverse {
712                exprs.reverse();
713                *reverse = false;
714            } else {
715                exprs.rotate_left(1);
716            }
717
718            Ok(Transformed::yes(LogicalPlan::Projection(
719                Projection::try_new(exprs, Arc::clone(&projection.input))?,
720            )))
721        }
722    }
723}