1use std::fmt::Debug;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use datafusion_expr::registry::FunctionRegistry;
25use datafusion_expr::{assert_expected_schema, InvariantLevel};
26use log::{debug, warn};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::instant::Instant;
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
33use datafusion_expr::logical_plan::LogicalPlan;
34
35use crate::common_subexpr_eliminate::CommonSubexprEliminate;
36use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
37use crate::eliminate_cross_join::EliminateCrossJoin;
38use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
39use crate::eliminate_filter::EliminateFilter;
40use crate::eliminate_group_by_constant::EliminateGroupByConstant;
41use crate::eliminate_join::EliminateJoin;
42use crate::eliminate_limit::EliminateLimit;
43use crate::eliminate_nested_union::EliminateNestedUnion;
44use crate::eliminate_one_union::EliminateOneUnion;
45use crate::eliminate_outer_join::EliminateOuterJoin;
46use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
47use crate::filter_null_join_keys::FilterNullJoinKeys;
48use crate::optimize_projections::OptimizeProjections;
49use crate::plan_signature::LogicalPlanSignature;
50use crate::propagate_empty_relation::PropagateEmptyRelation;
51use crate::push_down_filter::PushDownFilter;
52use crate::push_down_limit::PushDownLimit;
53use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
54use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
55use crate::simplify_expressions::SimplifyExpressions;
56use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
57use crate::utils::log_plan;
58
59pub trait OptimizerRule: Debug {
72 fn name(&self) -> &str;
74
75 fn apply_order(&self) -> Option<ApplyOrder> {
80 None
81 }
82
83 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
85 fn supports_rewrite(&self) -> bool {
86 true
87 }
88
89 fn rewrite(
92 &self,
93 _plan: LogicalPlan,
94 _config: &dyn OptimizerConfig,
95 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
96 internal_err!("rewrite is not implemented for {}", self.name())
97 }
98}
99
100pub trait OptimizerConfig {
102 fn query_execution_start_time(&self) -> DateTime<Utc>;
105
106 fn alias_generator(&self) -> &Arc<AliasGenerator>;
108
109 fn options(&self) -> &ConfigOptions;
110
111 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
112 None
113 }
114}
115
116#[derive(Debug)]
119pub struct OptimizerContext {
120 query_execution_start_time: DateTime<Utc>,
123
124 alias_generator: Arc<AliasGenerator>,
126
127 options: ConfigOptions,
128}
129
130impl OptimizerContext {
131 pub fn new() -> Self {
133 let mut options = ConfigOptions::default();
134 options.optimizer.filter_null_join_keys = true;
135
136 Self {
137 query_execution_start_time: Utc::now(),
138 alias_generator: Arc::new(AliasGenerator::new()),
139 options,
140 }
141 }
142
143 pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
145 self.options.optimizer.filter_null_join_keys = filter_null_keys;
146 self
147 }
148
149 pub fn with_query_execution_start_time(
152 mut self,
153 query_execution_tart_time: DateTime<Utc>,
154 ) -> Self {
155 self.query_execution_start_time = query_execution_tart_time;
156 self
157 }
158
159 pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
162 self.options.optimizer.skip_failed_rules = b;
163 self
164 }
165
166 pub fn with_max_passes(mut self, v: u8) -> Self {
168 self.options.optimizer.max_passes = v as usize;
169 self
170 }
171}
172
173impl Default for OptimizerContext {
174 fn default() -> Self {
176 Self::new()
177 }
178}
179
180impl OptimizerConfig for OptimizerContext {
181 fn query_execution_start_time(&self) -> DateTime<Utc> {
182 self.query_execution_start_time
183 }
184
185 fn alias_generator(&self) -> &Arc<AliasGenerator> {
186 &self.alias_generator
187 }
188
189 fn options(&self) -> &ConfigOptions {
190 &self.options
191 }
192}
193
194#[derive(Clone, Debug)]
196pub struct Optimizer {
197 pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
199}
200
201#[derive(Debug, Clone, Copy, PartialEq)]
206pub enum ApplyOrder {
207 TopDown,
209 BottomUp,
211}
212
213impl Default for Optimizer {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219impl Optimizer {
220 pub fn new() -> Self {
222 let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
223 Arc::new(EliminateNestedUnion::new()),
224 Arc::new(SimplifyExpressions::new()),
225 Arc::new(ReplaceDistinctWithAggregate::new()),
226 Arc::new(EliminateJoin::new()),
227 Arc::new(DecorrelatePredicateSubquery::new()),
228 Arc::new(ScalarSubqueryToJoin::new()),
229 Arc::new(ExtractEquijoinPredicate::new()),
230 Arc::new(EliminateDuplicatedExpr::new()),
231 Arc::new(EliminateFilter::new()),
232 Arc::new(EliminateCrossJoin::new()),
233 Arc::new(EliminateLimit::new()),
234 Arc::new(PropagateEmptyRelation::new()),
235 Arc::new(EliminateOneUnion::new()),
237 Arc::new(FilterNullJoinKeys::default()),
238 Arc::new(EliminateOuterJoin::new()),
239 Arc::new(PushDownLimit::new()),
241 Arc::new(PushDownFilter::new()),
242 Arc::new(SingleDistinctToGroupBy::new()),
243 Arc::new(EliminateGroupByConstant::new()),
246 Arc::new(CommonSubexprEliminate::new()),
247 Arc::new(OptimizeProjections::new()),
248 ];
249
250 Self::with_rules(rules)
251 }
252
253 pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
255 Self { rules }
256 }
257}
258
259struct Rewriter<'a> {
261 apply_order: ApplyOrder,
262 rule: &'a dyn OptimizerRule,
263 config: &'a dyn OptimizerConfig,
264}
265
266impl<'a> Rewriter<'a> {
267 fn new(
268 apply_order: ApplyOrder,
269 rule: &'a dyn OptimizerRule,
270 config: &'a dyn OptimizerConfig,
271 ) -> Self {
272 Self {
273 apply_order,
274 rule,
275 config,
276 }
277 }
278}
279
280impl TreeNodeRewriter for Rewriter<'_> {
281 type Node = LogicalPlan;
282
283 fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
284 if self.apply_order == ApplyOrder::TopDown {
285 {
286 self.rule.rewrite(node, self.config)
287 }
288 } else {
289 Ok(Transformed::no(node))
290 }
291 }
292
293 fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
294 if self.apply_order == ApplyOrder::BottomUp {
295 {
296 self.rule.rewrite(node, self.config)
297 }
298 } else {
299 Ok(Transformed::no(node))
300 }
301 }
302}
303
304impl Optimizer {
305 pub fn optimize<F>(
308 &self,
309 plan: LogicalPlan,
310 config: &dyn OptimizerConfig,
311 mut observer: F,
312 ) -> Result<LogicalPlan>
313 where
314 F: FnMut(&LogicalPlan, &dyn OptimizerRule),
315 {
316 plan.check_invariants(InvariantLevel::Executable)
318 .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?;
319
320 let start_time = Instant::now();
321 let options = config.options();
322 let mut new_plan = plan;
323
324 let mut previous_plans = HashSet::with_capacity(16);
325 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
326
327 let starting_schema = Arc::clone(new_plan.schema());
328
329 let mut i = 0;
330 while i < options.optimizer.max_passes {
331 log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
332
333 for rule in &self.rules {
334 let prev_plan = options
337 .optimizer
338 .skip_failed_rules
339 .then(|| new_plan.clone());
340
341 let starting_schema = Arc::clone(new_plan.schema());
342
343 let result = match rule.apply_order() {
344 Some(apply_order) => new_plan.rewrite_with_subqueries(
346 &mut Rewriter::new(apply_order, rule.as_ref(), config),
347 ),
348 None => {
350 rule.rewrite(new_plan, config)
351 },
352 }
353 .and_then(|tnr| {
354 assert_valid_optimization(&tnr.data, &starting_schema)
356 .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
357
358 #[cfg(debug_assertions)]
360 tnr.data.check_invariants(InvariantLevel::Executable)
361 .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?;
362
363 Ok(tnr)
364 });
365
366 match (result, prev_plan) {
368 (
370 Ok(Transformed {
371 data, transformed, ..
372 }),
373 _,
374 ) => {
375 new_plan = data;
376 observer(&new_plan, rule.as_ref());
377 if transformed {
378 log_plan(rule.name(), &new_plan);
379 } else {
380 debug!(
381 "Plan unchanged by optimizer rule '{}' (pass {})",
382 rule.name(),
383 i
384 );
385 }
386 }
387 (Err(e), Some(orig_plan)) => {
390 warn!(
394 "Skipping optimizer rule '{}' due to unexpected error: {}",
395 rule.name(),
396 e
397 );
398 new_plan = orig_plan;
399 }
400 (Err(e), None) => {
402 return Err(e.context(format!(
403 "Optimizer rule '{}' failed",
404 rule.name()
405 )));
406 }
407 }
408 }
409 log_plan(&format!("Optimized plan (pass {i})"), &new_plan);
410
411 let plan_is_fresh =
413 previous_plans.insert(LogicalPlanSignature::new(&new_plan));
414 if !plan_is_fresh {
415 debug!("optimizer pass {} did not make changes", i);
417 break;
418 }
419 i += 1;
420 }
421
422 assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
424 e.context("Check optimizer-specific invariants after all passes")
425 })?;
426
427 new_plan
429 .check_invariants(InvariantLevel::Executable)
430 .map_err(|e| {
431 e.context("Invalid (non-executable) plan after LP Optimizers")
432 })?;
433
434 log_plan("Final optimized plan", &new_plan);
435 debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
436 Ok(new_plan)
437 }
438}
439
440fn assert_valid_optimization(
445 plan: &LogicalPlan,
446 prev_schema: &Arc<DFSchema>,
447) -> Result<()> {
448 assert_expected_schema(prev_schema, plan)?;
451
452 Ok(())
453}
454
455#[cfg(test)]
456mod tests {
457 use std::sync::{Arc, Mutex};
458
459 use datafusion_common::tree_node::Transformed;
460 use datafusion_common::{
461 assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
462 };
463 use datafusion_expr::logical_plan::EmptyRelation;
464 use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
465
466 use crate::optimizer::Optimizer;
467 use crate::test::test_table_scan;
468 use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
469
470 use super::ApplyOrder;
471
472 #[test]
473 fn skip_failing_rule() {
474 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
475 let config = OptimizerContext::new().with_skip_failing_rules(true);
476 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
477 produce_one_row: false,
478 schema: Arc::new(DFSchema::empty()),
479 });
480 opt.optimize(plan, &config, &observe).unwrap();
481 }
482
483 #[test]
484 fn no_skip_failing_rule() {
485 let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
486 let config = OptimizerContext::new().with_skip_failing_rules(false);
487 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
488 produce_one_row: false,
489 schema: Arc::new(DFSchema::empty()),
490 });
491 let err = opt.optimize(plan, &config, &observe).unwrap_err();
492 assert_eq!(
493 "Optimizer rule 'bad rule' failed\ncaused by\n\
494 Error during planning: rule failed",
495 err.strip_backtrace()
496 );
497 }
498
499 #[test]
500 fn generate_different_schema() {
501 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
502 let config = OptimizerContext::new().with_skip_failing_rules(false);
503 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
504 produce_one_row: false,
505 schema: Arc::new(DFSchema::empty()),
506 });
507 let err = opt.optimize(plan, &config, &observe).unwrap_err();
508
509 assert_contains!(
511 err.strip_backtrace(),
512 "Failed due to a difference in schemas: original schema: DFSchema"
513 );
514 }
515
516 #[test]
517 fn skip_generate_different_schema() {
518 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
519 let config = OptimizerContext::new().with_skip_failing_rules(true);
520 let plan = LogicalPlan::EmptyRelation(EmptyRelation {
521 produce_one_row: false,
522 schema: Arc::new(DFSchema::empty()),
523 });
524 opt.optimize(plan, &config, &observe).unwrap();
525 }
526
527 #[test]
528 fn generate_same_schema_different_metadata() -> Result<()> {
529 let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
532 let config = OptimizerContext::new().with_skip_failing_rules(false);
533
534 let input = Arc::new(test_table_scan()?);
535 let input_schema = Arc::clone(input.schema());
536
537 let plan = LogicalPlan::Projection(Projection::try_new_with_schema(
538 vec![col("a"), col("b"), col("c")],
539 input,
540 add_metadata_to_fields(input_schema.as_ref()),
541 )?);
542
543 assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
545 let optimized_plan = opt.optimize(plan, &config, &observe)?;
546 assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
548 Ok(())
549 }
550
551 #[test]
552 fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
553 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
557 let config = OptimizerContext::new().with_max_passes(16);
558
559 let initial_plan = LogicalPlanBuilder::empty(false)
560 .project([lit(1), lit(2), lit(3)])?
561 .project([lit(100)])? .build()?;
563
564 let mut plans: Vec<LogicalPlan> = Vec::new();
565 let final_plan =
566 opt.optimize(initial_plan.clone(), &config, |p, _| plans.push(p.clone()))?;
567
568 assert_eq!(3, plans.len());
570
571 assert_eq!(initial_plan, final_plan);
573
574 Ok(())
575 }
576
577 #[test]
578 fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
579 let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
583 let config = OptimizerContext::new().with_max_passes(16);
584
585 let initial_plan = LogicalPlanBuilder::empty(false)
586 .project([lit(1), lit(2), lit(3)])?
587 .project([lit(100)])? .build()?;
589
590 let mut plans: Vec<LogicalPlan> = Vec::new();
591 let final_plan =
592 opt.optimize(initial_plan, &config, |p, _| plans.push(p.clone()))?;
593
594 assert_eq!(4, plans.len());
596
597 assert_eq!(plans[0], final_plan);
599
600 Ok(())
601 }
602
603 fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
604 let new_fields = schema
605 .iter()
606 .enumerate()
607 .map(|(i, (qualifier, field))| {
608 let metadata =
609 [("key".into(), format!("value {i}"))].into_iter().collect();
610
611 let new_arrow_field = field.as_ref().clone().with_metadata(metadata);
612 (qualifier.cloned(), Arc::new(new_arrow_field))
613 })
614 .collect::<Vec<_>>();
615
616 let new_metadata = schema.metadata().clone();
617 Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
618 }
619
620 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
621
622 #[derive(Default, Debug)]
623 struct BadRule {}
624
625 impl OptimizerRule for BadRule {
626 fn name(&self) -> &str {
627 "bad rule"
628 }
629
630 fn supports_rewrite(&self) -> bool {
631 true
632 }
633
634 fn rewrite(
635 &self,
636 _plan: LogicalPlan,
637 _config: &dyn OptimizerConfig,
638 ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
639 plan_err!("rule failed")
640 }
641 }
642
643 #[derive(Default, Debug)]
645 struct GetTableScanRule {}
646
647 impl OptimizerRule for GetTableScanRule {
648 fn name(&self) -> &str {
649 "get table_scan rule"
650 }
651
652 fn supports_rewrite(&self) -> bool {
653 true
654 }
655
656 fn rewrite(
657 &self,
658 _plan: LogicalPlan,
659 _config: &dyn OptimizerConfig,
660 ) -> Result<Transformed<LogicalPlan>> {
661 let table_scan = test_table_scan()?;
662 Ok(Transformed::yes(
663 LogicalPlanBuilder::from(table_scan).build()?,
664 ))
665 }
666 }
667
668 #[derive(Default, Debug)]
672 struct RotateProjectionRule {
673 reverse_on_first_pass: Mutex<bool>,
675 }
676
677 impl RotateProjectionRule {
678 fn new(reverse_on_first_pass: bool) -> Self {
679 Self {
680 reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
681 }
682 }
683 }
684
685 impl OptimizerRule for RotateProjectionRule {
686 fn name(&self) -> &str {
687 "rotate_projection"
688 }
689
690 fn apply_order(&self) -> Option<ApplyOrder> {
691 Some(ApplyOrder::TopDown)
692 }
693
694 fn supports_rewrite(&self) -> bool {
695 true
696 }
697
698 fn rewrite(
699 &self,
700 plan: LogicalPlan,
701 _config: &dyn OptimizerConfig,
702 ) -> Result<Transformed<LogicalPlan>> {
703 let projection = match plan {
704 LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
705 _ => return Ok(Transformed::no(plan)),
706 };
707
708 let mut exprs = projection.expr.clone();
709
710 let mut reverse = self.reverse_on_first_pass.lock().unwrap();
711 if *reverse {
712 exprs.reverse();
713 *reverse = false;
714 } else {
715 exprs.rotate_left(1);
716 }
717
718 Ok(Transformed::yes(LogicalPlan::Projection(
719 Projection::try_new(exprs, Arc::clone(&projection.input))?,
720 )))
721 }
722 }
723}