polars_plan/plans/optimizer/
mod.rs1use polars_core::prelude::*;
2
3use crate::prelude::*;
4
5mod cache_states;
6mod delay_rechunk;
7
8mod cluster_with_columns;
9mod collapse_and_project;
10mod collapse_joins;
11mod collect_members;
12mod count_star;
13#[cfg(feature = "cse")]
14mod cse;
15mod flatten_union;
16#[cfg(feature = "fused")]
17mod fused;
18mod join_utils;
19mod predicate_pushdown;
20mod projection_pushdown;
21mod set_order;
22mod simplify_expr;
23mod slice_pushdown_expr;
24mod slice_pushdown_lp;
25mod stack_opt;
26
27use collapse_and_project::SimpleProjectionAndCollapse;
28use delay_rechunk::DelayRechunk;
29use polars_core::config::verbose;
30use polars_io::predicates::PhysicalIoExpr;
31pub use predicate_pushdown::PredicatePushDown;
32pub use projection_pushdown::ProjectionPushDown;
33pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
34use slice_pushdown_lp::SlicePushDown;
35pub use stack_opt::{OptimizationRule, StackOptimizer};
36
37use self::flatten_union::FlattenUnionRule;
38use self::set_order::set_order_flags;
39pub use crate::frame::{AllowedOptimizations, OptFlags};
40pub use crate::plans::conversion::type_coercion::TypeCoercionRule;
41use crate::plans::optimizer::count_star::CountStar;
42#[cfg(feature = "cse")]
43use crate::plans::optimizer::cse::prune_unused_caches;
44#[cfg(feature = "cse")]
45use crate::plans::optimizer::cse::CommonSubExprOptimizer;
46use crate::plans::optimizer::predicate_pushdown::ExprEval;
47#[cfg(feature = "cse")]
48use crate::plans::visitor::*;
49use crate::prelude::optimizer::collect_members::MemberCollector;
50
51pub trait Optimize {
52 fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;
53}
54
55const HASHMAP_SIZE: usize = 16;
57
58pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {
59 PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))
60}
61
62pub fn optimize(
63 logical_plan: DslPlan,
64 mut opt_state: OptFlags,
65 lp_arena: &mut Arena<IR>,
66 expr_arena: &mut Arena<AExpr>,
67 scratch: &mut Vec<Node>,
68 expr_eval: ExprEval<'_>,
69) -> PolarsResult<Node> {
70 #[allow(dead_code)]
71 let verbose = verbose();
72
73 let opt = StackOptimizer {};
75 let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);
76
77 #[allow(clippy::eq_op)]
80 #[cfg(feature = "cse")]
81 if opt_state.contains(OptFlags::EAGER) {
82 opt_state &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);
83 }
84 let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_state)?;
85
86 #[cfg(feature = "cse")]
89 let comm_subplan_elim = opt_state.contains(OptFlags::COMM_SUBPLAN_ELIM);
90
91 #[cfg(feature = "cse")]
92 let comm_subexpr_elim = opt_state.contains(OptFlags::COMM_SUBEXPR_ELIM);
93 #[cfg(not(feature = "cse"))]
94 let comm_subexpr_elim = false;
95
96 #[allow(unused_variables)]
97 let agg_scan_projection =
98 opt_state.contains(OptFlags::FILE_CACHING) && !opt_state.streaming() && !opt_state.eager();
99
100 #[cfg(debug_assertions)]
102 let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();
103
104 let mut members = MemberCollector::new();
106 if !opt_state.eager() && (comm_subexpr_elim || opt_state.projection_pushdown()) {
107 members.collect(lp_top, lp_arena, expr_arena)
108 }
109
110 if opt_state.contains(OptFlags::CHECK_ORDER_OBSERVE)
112 && members.has_group_by | members.has_sort | members.has_distinct
113 {
114 set_order_flags(lp_top, lp_arena, expr_arena, scratch);
115 }
116
117 if opt_state.simplify_expr() {
118 #[cfg(feature = "fused")]
119 rules.push(Box::new(fused::FusedArithmetic {}));
120 }
121
122 #[cfg(feature = "cse")]
123 let _cse_plan_changed = if comm_subplan_elim
124 && members.has_joins_or_unions
125 && members.has_duplicate_scans()
126 && !members.has_cache
127 {
128 if verbose {
129 eprintln!("found multiple sources; run comm_subplan_elim")
130 }
131 let (lp, changed, cid2c) = cse::elim_cmn_subplans(lp_top, lp_arena, expr_arena);
132
133 prune_unused_caches(lp_arena, cid2c);
134
135 lp_top = lp;
136 members.has_cache |= changed;
137 changed
138 } else {
139 false
140 };
141 #[cfg(not(feature = "cse"))]
142 let _cse_plan_changed = false;
143
144 if opt_state.projection_pushdown() {
146 let mut projection_pushdown_opt = ProjectionPushDown::new();
147 let alp = lp_arena.take(lp_top);
148 let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
149 lp_arena.replace(lp_top, alp);
150
151 if projection_pushdown_opt.is_count_star {
152 let mut count_star_opt = CountStar::new();
153 count_star_opt.optimize_plan(lp_arena, expr_arena, lp_top)?;
154 }
155 }
156
157 if opt_state.predicate_pushdown() {
158 let mut predicate_pushdown_opt = PredicatePushDown::new(expr_eval);
159 let alp = lp_arena.take(lp_top);
160 let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
161 lp_arena.replace(lp_top, alp);
162 }
163
164 if opt_state.cluster_with_columns() {
165 cluster_with_columns::optimize(lp_top, lp_arena, expr_arena)
166 }
167
168 if opt_state.collapse_joins() && members.has_filter_with_join_input {
170 collapse_joins::optimize(lp_top, lp_arena, expr_arena)
171 }
172
173 if opt_state.fast_projection() {
175 rules.push(Box::new(SimpleProjectionAndCollapse::new(
176 opt_state.eager(),
177 )));
178 }
179
180 if !opt_state.eager() {
181 rules.push(Box::new(DelayRechunk::new()));
182 }
183
184 if opt_state.slice_pushdown() {
185 let mut slice_pushdown_opt =
186 SlicePushDown::new(opt_state.streaming(), opt_state.new_streaming());
187 let alp = lp_arena.take(lp_top);
188 let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
189
190 lp_arena.replace(lp_top, alp);
191
192 rules.push(Box::new(slice_pushdown_opt));
194 }
195 if opt_state.simplify_expr() {
198 rules.push(Box::new(SimplifyBooleanRule {}));
199 }
200
201 if !opt_state.eager() {
202 rules.push(Box::new(FlattenUnionRule {}));
203 }
204
205 lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top)?;
206
207 if members.has_joins_or_unions && members.has_cache && _cse_plan_changed {
208 cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, expr_eval, verbose)?;
210 }
211
212 #[cfg(feature = "cse")]
214 if comm_subexpr_elim && !members.has_ext_context {
215 let mut optimizer = CommonSubExprOptimizer::new();
216 let alp_node = IRNode::new(lp_top);
217
218 lp_top = try_with_ir_arena(lp_arena, expr_arena, |arena| {
219 let rewritten = alp_node.rewrite(&mut optimizer, arena)?;
220 Ok(rewritten.node())
221 })?;
222 }
223
224 #[cfg(debug_assertions)]
226 {
227 assert_eq!(
229 prev_schema.iter_names().collect::<Vec<_>>(),
230 lp_arena
231 .get(lp_top)
232 .schema(lp_arena)
233 .iter_names()
234 .collect::<Vec<_>>()
235 );
236 };
237
238 Ok(lp_top)
239}