1mod dot;
2mod format;
3mod inputs;
4mod scan_sources;
5mod schema;
6pub(crate) mod tree_format;
7
8use std::borrow::Cow;
9use std::fmt;
10
11pub use dot::{EscapeLabel, IRDotDisplay, PathsDisplay, ScanSourcesDisplay};
12pub use format::{ExprIRDisplay, IRDisplay};
13use hive::HivePartitions;
14use polars_core::prelude::*;
15use polars_utils::idx_vec::UnitVec;
16use polars_utils::unitvec;
17pub use scan_sources::{ScanSourceIter, ScanSourceRef, ScanSources};
18#[cfg(feature = "ir_serde")]
19use serde::{Deserialize, Serialize};
20use strum_macros::IntoStaticStr;
21
22use crate::prelude::*;
23
24#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
25pub struct IRPlan {
26 pub lp_top: Node,
27 pub lp_arena: Arena<IR>,
28 pub expr_arena: Arena<AExpr>,
29}
30
31#[derive(Clone, Copy)]
32pub struct IRPlanRef<'a> {
33 pub lp_top: Node,
34 pub lp_arena: &'a Arena<IR>,
35 pub expr_arena: &'a Arena<AExpr>,
36}
37
38#[derive(Clone, Debug, Default, IntoStaticStr)]
41#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
42#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
43pub enum IR {
44 #[cfg(feature = "python")]
45 PythonScan {
46 options: PythonOptions,
47 },
48 Slice {
49 input: Node,
50 offset: i64,
51 len: IdxSize,
52 },
53 Filter {
54 input: Node,
55 predicate: ExprIR,
56 },
57 Scan {
58 sources: ScanSources,
59 file_info: FileInfo,
60 hive_parts: Option<Arc<Vec<HivePartitions>>>,
61 predicate: Option<ExprIR>,
62 output_schema: Option<SchemaRef>,
64 scan_type: FileScan,
65 file_options: FileScanOptions,
67 },
68 DataFrameScan {
69 df: Arc<DataFrame>,
70 schema: SchemaRef,
71 output_schema: Option<SchemaRef>,
74 },
75 SimpleProjection {
78 input: Node,
79 columns: SchemaRef,
80 },
81 Select {
83 input: Node,
84 expr: Vec<ExprIR>,
85 schema: SchemaRef,
86 options: ProjectionOptions,
87 },
88 Sort {
89 input: Node,
90 by_column: Vec<ExprIR>,
91 slice: Option<(i64, usize)>,
92 sort_options: SortMultipleOptions,
93 },
94 Cache {
95 input: Node,
96 id: usize,
98 cache_hits: u32,
100 },
101 GroupBy {
102 input: Node,
103 keys: Vec<ExprIR>,
104 aggs: Vec<ExprIR>,
105 schema: SchemaRef,
106 maintain_order: bool,
107 options: Arc<GroupbyOptions>,
108 #[cfg_attr(feature = "ir_serde", serde(skip))]
109 apply: Option<Arc<dyn DataFrameUdf>>,
110 },
111 Join {
112 input_left: Node,
113 input_right: Node,
114 schema: SchemaRef,
115 left_on: Vec<ExprIR>,
116 right_on: Vec<ExprIR>,
117 options: Arc<JoinOptions>,
118 },
119 HStack {
120 input: Node,
121 exprs: Vec<ExprIR>,
122 schema: SchemaRef,
123 options: ProjectionOptions,
124 },
125 Distinct {
126 input: Node,
127 options: DistinctOptionsIR,
128 },
129 MapFunction {
130 input: Node,
131 function: FunctionIR,
132 },
133 Union {
134 inputs: Vec<Node>,
135 options: UnionOptions,
136 },
137 HConcat {
140 inputs: Vec<Node>,
141 schema: SchemaRef,
142 options: HConcatOptions,
143 },
144 ExtContext {
145 input: Node,
146 contexts: Vec<Node>,
147 schema: SchemaRef,
148 },
149 Sink {
150 input: Node,
151 payload: SinkType,
152 },
153 #[default]
154 Invalid,
155}
156
157impl IRPlan {
158 pub fn new(top: Node, ir_arena: Arena<IR>, expr_arena: Arena<AExpr>) -> Self {
159 Self {
160 lp_top: top,
161 lp_arena: ir_arena,
162 expr_arena,
163 }
164 }
165
166 pub fn root(&self) -> &IR {
167 self.lp_arena.get(self.lp_top)
168 }
169
170 pub fn as_ref(&self) -> IRPlanRef {
171 IRPlanRef {
172 lp_top: self.lp_top,
173 lp_arena: &self.lp_arena,
174 expr_arena: &self.expr_arena,
175 }
176 }
177
178 pub fn extract_streaming_plan(&self) -> Option<IRPlanRef> {
180 self.as_ref().extract_streaming_plan()
181 }
182
183 pub fn describe(&self) -> String {
184 self.as_ref().describe()
185 }
186
187 pub fn describe_tree_format(&self) -> String {
188 self.as_ref().describe_tree_format()
189 }
190
191 pub fn display(&self) -> format::IRDisplay {
192 self.as_ref().display()
193 }
194
195 pub fn display_dot(&self) -> dot::IRDotDisplay {
196 self.as_ref().display_dot()
197 }
198}
199
200impl<'a> IRPlanRef<'a> {
201 pub fn root(self) -> &'a IR {
202 self.lp_arena.get(self.lp_top)
203 }
204
205 pub fn with_root(self, root: Node) -> Self {
206 Self {
207 lp_top: root,
208 lp_arena: self.lp_arena,
209 expr_arena: self.expr_arena,
210 }
211 }
212
213 pub fn extract_streaming_plan(self) -> Option<IRPlanRef<'a>> {
215 let IR::MapFunction { input: _, function } = self.root() else {
219 return None;
220 };
221
222 let FunctionIR::Pipeline { original, .. } = function else {
223 return None;
224 };
225
226 Some(original.as_ref()?.as_ref().as_ref())
227 }
228
229 pub fn display(self) -> format::IRDisplay<'a> {
230 format::IRDisplay::new(self)
231 }
232
233 pub fn display_dot(self) -> dot::IRDotDisplay<'a> {
234 dot::IRDotDisplay::new(self)
235 }
236
237 pub fn describe(self) -> String {
238 self.display().to_string()
239 }
240
241 pub fn describe_tree_format(self) -> String {
242 let mut visitor = tree_format::TreeFmtVisitor::default();
243 tree_format::TreeFmtNode::root_logical_plan(self).traverse(&mut visitor);
244 format!("{visitor:#?}")
245 }
246}
247
248impl fmt::Debug for IRPlan {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 <format::IRDisplay as fmt::Display>::fmt(&self.display(), f)
251 }
252}
253
254impl fmt::Debug for IRPlanRef<'_> {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 <format::IRDisplay as fmt::Display>::fmt(&self.display(), f)
257 }
258}
259
260#[cfg(test)]
261mod test {
262 use super::*;
263
264 #[ignore]
266 #[test]
267 fn test_alp_size() {
268 assert!(size_of::<IR>() <= 152);
269 }
270}