polars_plan/plans/optimizer/
mod.rs

1use polars_core::prelude::*;
2
3use crate::prelude::*;
4
5mod cache_states;
6mod delay_rechunk;
7
8mod cluster_with_columns;
9mod collapse_and_project;
10mod collapse_joins;
11mod collect_members;
12mod count_star;
13#[cfg(feature = "cse")]
14mod cse;
15mod flatten_union;
16#[cfg(feature = "fused")]
17mod fused;
18mod join_utils;
19mod predicate_pushdown;
20mod projection_pushdown;
21mod set_order;
22mod simplify_expr;
23mod slice_pushdown_expr;
24mod slice_pushdown_lp;
25mod stack_opt;
26
27use collapse_and_project::SimpleProjectionAndCollapse;
28use delay_rechunk::DelayRechunk;
29use polars_core::config::verbose;
30use polars_io::predicates::PhysicalIoExpr;
31pub use predicate_pushdown::PredicatePushDown;
32pub use projection_pushdown::ProjectionPushDown;
33pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
34use slice_pushdown_lp::SlicePushDown;
35pub use stack_opt::{OptimizationRule, StackOptimizer};
36
37use self::flatten_union::FlattenUnionRule;
38use self::set_order::set_order_flags;
39pub use crate::frame::{AllowedOptimizations, OptFlags};
40pub use crate::plans::conversion::type_coercion::TypeCoercionRule;
41use crate::plans::optimizer::count_star::CountStar;
42#[cfg(feature = "cse")]
43use crate::plans::optimizer::cse::prune_unused_caches;
44#[cfg(feature = "cse")]
45use crate::plans::optimizer::cse::CommonSubExprOptimizer;
46use crate::plans::optimizer::predicate_pushdown::ExprEval;
47#[cfg(feature = "cse")]
48use crate::plans::visitor::*;
49use crate::prelude::optimizer::collect_members::MemberCollector;
50
51pub trait Optimize {
52    fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;
53}
54
55// arbitrary constant to reduce reallocation.
56const HASHMAP_SIZE: usize = 16;
57
58pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {
59    PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))
60}
61
62pub fn optimize(
63    logical_plan: DslPlan,
64    mut opt_state: OptFlags,
65    lp_arena: &mut Arena<IR>,
66    expr_arena: &mut Arena<AExpr>,
67    scratch: &mut Vec<Node>,
68    expr_eval: ExprEval<'_>,
69) -> PolarsResult<Node> {
70    #[allow(dead_code)]
71    let verbose = verbose();
72
73    // Gradually fill the rules passed to the optimizer
74    let opt = StackOptimizer {};
75    let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);
76
77    // Unset CSE
78    // This can be turned on again during ir-conversion.
79    #[allow(clippy::eq_op)]
80    #[cfg(feature = "cse")]
81    if opt_state.contains(OptFlags::EAGER) {
82        opt_state &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);
83    }
84    let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_state)?;
85
86    // Don't run optimizations that don't make sense on a single node.
87    // This keeps eager execution more snappy.
88    #[cfg(feature = "cse")]
89    let comm_subplan_elim = opt_state.contains(OptFlags::COMM_SUBPLAN_ELIM);
90
91    #[cfg(feature = "cse")]
92    let comm_subexpr_elim = opt_state.contains(OptFlags::COMM_SUBEXPR_ELIM);
93    #[cfg(not(feature = "cse"))]
94    let comm_subexpr_elim = false;
95
96    #[allow(unused_variables)]
97    let agg_scan_projection =
98        opt_state.contains(OptFlags::FILE_CACHING) && !opt_state.streaming() && !opt_state.eager();
99
100    // During debug we check if the optimizations have not modified the final schema.
101    #[cfg(debug_assertions)]
102    let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();
103
104    // Collect members for optimizations that need it.
105    let mut members = MemberCollector::new();
106    if !opt_state.eager() && (comm_subexpr_elim || opt_state.projection_pushdown()) {
107        members.collect(lp_top, lp_arena, expr_arena)
108    }
109
110    // Run before slice pushdown
111    if opt_state.contains(OptFlags::CHECK_ORDER_OBSERVE)
112        && members.has_group_by | members.has_sort | members.has_distinct
113    {
114        set_order_flags(lp_top, lp_arena, expr_arena, scratch);
115    }
116
117    if opt_state.simplify_expr() {
118        #[cfg(feature = "fused")]
119        rules.push(Box::new(fused::FusedArithmetic {}));
120    }
121
122    #[cfg(feature = "cse")]
123    let _cse_plan_changed = if comm_subplan_elim
124        && members.has_joins_or_unions
125        && members.has_duplicate_scans()
126        && !members.has_cache
127    {
128        if verbose {
129            eprintln!("found multiple sources; run comm_subplan_elim")
130        }
131        let (lp, changed, cid2c) = cse::elim_cmn_subplans(lp_top, lp_arena, expr_arena);
132
133        prune_unused_caches(lp_arena, cid2c);
134
135        lp_top = lp;
136        members.has_cache |= changed;
137        changed
138    } else {
139        false
140    };
141    #[cfg(not(feature = "cse"))]
142    let _cse_plan_changed = false;
143
144    // Should be run before predicate pushdown.
145    if opt_state.projection_pushdown() {
146        let mut projection_pushdown_opt = ProjectionPushDown::new();
147        let alp = lp_arena.take(lp_top);
148        let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
149        lp_arena.replace(lp_top, alp);
150
151        if projection_pushdown_opt.is_count_star {
152            let mut count_star_opt = CountStar::new();
153            count_star_opt.optimize_plan(lp_arena, expr_arena, lp_top)?;
154        }
155    }
156
157    if opt_state.predicate_pushdown() {
158        let mut predicate_pushdown_opt = PredicatePushDown::new(expr_eval);
159        let alp = lp_arena.take(lp_top);
160        let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
161        lp_arena.replace(lp_top, alp);
162    }
163
164    if opt_state.cluster_with_columns() {
165        cluster_with_columns::optimize(lp_top, lp_arena, expr_arena)
166    }
167
168    // Make sure it is after predicate pushdown
169    if opt_state.collapse_joins() && members.has_filter_with_join_input {
170        collapse_joins::optimize(lp_top, lp_arena, expr_arena)
171    }
172
173    // Make sure its before slice pushdown.
174    if opt_state.fast_projection() {
175        rules.push(Box::new(SimpleProjectionAndCollapse::new(
176            opt_state.eager(),
177        )));
178    }
179
180    if !opt_state.eager() {
181        rules.push(Box::new(DelayRechunk::new()));
182    }
183
184    if opt_state.slice_pushdown() {
185        let mut slice_pushdown_opt =
186            SlicePushDown::new(opt_state.streaming(), opt_state.new_streaming());
187        let alp = lp_arena.take(lp_top);
188        let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
189
190        lp_arena.replace(lp_top, alp);
191
192        // Expressions use the stack optimizer.
193        rules.push(Box::new(slice_pushdown_opt));
194    }
195    // This optimization removes branches, so we must do it when type coercion
196    // is completed.
197    if opt_state.simplify_expr() {
198        rules.push(Box::new(SimplifyBooleanRule {}));
199    }
200
201    if !opt_state.eager() {
202        rules.push(Box::new(FlattenUnionRule {}));
203    }
204
205    lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top)?;
206
207    if members.has_joins_or_unions && members.has_cache && _cse_plan_changed {
208        // We only want to run this on cse inserted caches
209        cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, expr_eval, verbose)?;
210    }
211
212    // This one should run (nearly) last as this modifies the projections
213    #[cfg(feature = "cse")]
214    if comm_subexpr_elim && !members.has_ext_context {
215        let mut optimizer = CommonSubExprOptimizer::new();
216        let alp_node = IRNode::new(lp_top);
217
218        lp_top = try_with_ir_arena(lp_arena, expr_arena, |arena| {
219            let rewritten = alp_node.rewrite(&mut optimizer, arena)?;
220            Ok(rewritten.node())
221        })?;
222    }
223
224    // During debug we check if the optimizations have not modified the final schema.
225    #[cfg(debug_assertions)]
226    {
227        // only check by names because we may supercast types.
228        assert_eq!(
229            prev_schema.iter_names().collect::<Vec<_>>(),
230            lp_arena
231                .get(lp_top)
232                .schema(lp_arena)
233                .iter_names()
234                .collect::<Vec<_>>()
235        );
236    };
237
238    Ok(lp_top)
239}