polars_plan/plans/ir/
mod.rs

1mod dot;
2mod format;
3mod inputs;
4mod scan_sources;
5mod schema;
6pub(crate) mod tree_format;
7
8use std::borrow::Cow;
9use std::fmt;
10
11pub use dot::{EscapeLabel, IRDotDisplay, PathsDisplay, ScanSourcesDisplay};
12pub use format::{ExprIRDisplay, IRDisplay};
13use hive::HivePartitions;
14use polars_core::prelude::*;
15use polars_utils::idx_vec::UnitVec;
16use polars_utils::unitvec;
17pub use scan_sources::{ScanSourceIter, ScanSourceRef, ScanSources};
18#[cfg(feature = "ir_serde")]
19use serde::{Deserialize, Serialize};
20use strum_macros::IntoStaticStr;
21
22use crate::prelude::*;
23
24#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
25pub struct IRPlan {
26    pub lp_top: Node,
27    pub lp_arena: Arena<IR>,
28    pub expr_arena: Arena<AExpr>,
29}
30
31#[derive(Clone, Copy)]
32pub struct IRPlanRef<'a> {
33    pub lp_top: Node,
34    pub lp_arena: &'a Arena<IR>,
35    pub expr_arena: &'a Arena<AExpr>,
36}
37
38/// [`IR`] is a representation of [`DslPlan`] with [`Node`]s which are allocated in an [`Arena`]
39/// In this IR the logical plan has access to the full dataset.
40#[derive(Clone, Debug, Default, IntoStaticStr)]
41#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
42#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
43pub enum IR {
44    #[cfg(feature = "python")]
45    PythonScan {
46        options: PythonOptions,
47    },
48    Slice {
49        input: Node,
50        offset: i64,
51        len: IdxSize,
52    },
53    Filter {
54        input: Node,
55        predicate: ExprIR,
56    },
57    Scan {
58        sources: ScanSources,
59        file_info: FileInfo,
60        hive_parts: Option<Arc<Vec<HivePartitions>>>,
61        predicate: Option<ExprIR>,
62        /// schema of the projected file
63        output_schema: Option<SchemaRef>,
64        scan_type: FileScan,
65        /// generic options that can be used for all file types.
66        file_options: FileScanOptions,
67    },
68    DataFrameScan {
69        df: Arc<DataFrame>,
70        schema: SchemaRef,
71        // Schema of the projected file
72        // If `None`, no projection is applied
73        output_schema: Option<SchemaRef>,
74    },
75    // Only selects columns (semantically only has row access).
76    // This is a more restricted operation than `Select`.
77    SimpleProjection {
78        input: Node,
79        columns: SchemaRef,
80    },
81    // Polars' `select` operation. This may access full materialized data.
82    Select {
83        input: Node,
84        expr: Vec<ExprIR>,
85        schema: SchemaRef,
86        options: ProjectionOptions,
87    },
88    Sort {
89        input: Node,
90        by_column: Vec<ExprIR>,
91        slice: Option<(i64, usize)>,
92        sort_options: SortMultipleOptions,
93    },
94    Cache {
95        input: Node,
96        // Unique ID.
97        id: usize,
98        /// How many hits the cache must be saved in memory.
99        cache_hits: u32,
100    },
101    GroupBy {
102        input: Node,
103        keys: Vec<ExprIR>,
104        aggs: Vec<ExprIR>,
105        schema: SchemaRef,
106        maintain_order: bool,
107        options: Arc<GroupbyOptions>,
108        #[cfg_attr(feature = "ir_serde", serde(skip))]
109        apply: Option<Arc<dyn DataFrameUdf>>,
110    },
111    Join {
112        input_left: Node,
113        input_right: Node,
114        schema: SchemaRef,
115        left_on: Vec<ExprIR>,
116        right_on: Vec<ExprIR>,
117        options: Arc<JoinOptions>,
118    },
119    HStack {
120        input: Node,
121        exprs: Vec<ExprIR>,
122        schema: SchemaRef,
123        options: ProjectionOptions,
124    },
125    Distinct {
126        input: Node,
127        options: DistinctOptionsIR,
128    },
129    MapFunction {
130        input: Node,
131        function: FunctionIR,
132    },
133    Union {
134        inputs: Vec<Node>,
135        options: UnionOptions,
136    },
137    /// Horizontal concatenation
138    /// - Invariant: the names will be unique
139    HConcat {
140        inputs: Vec<Node>,
141        schema: SchemaRef,
142        options: HConcatOptions,
143    },
144    ExtContext {
145        input: Node,
146        contexts: Vec<Node>,
147        schema: SchemaRef,
148    },
149    Sink {
150        input: Node,
151        payload: SinkType,
152    },
153    #[default]
154    Invalid,
155}
156
157impl IRPlan {
158    pub fn new(top: Node, ir_arena: Arena<IR>, expr_arena: Arena<AExpr>) -> Self {
159        Self {
160            lp_top: top,
161            lp_arena: ir_arena,
162            expr_arena,
163        }
164    }
165
166    pub fn root(&self) -> &IR {
167        self.lp_arena.get(self.lp_top)
168    }
169
170    pub fn as_ref(&self) -> IRPlanRef {
171        IRPlanRef {
172            lp_top: self.lp_top,
173            lp_arena: &self.lp_arena,
174            expr_arena: &self.expr_arena,
175        }
176    }
177
178    /// Extract the original logical plan if the plan is for the Streaming Engine
179    pub fn extract_streaming_plan(&self) -> Option<IRPlanRef> {
180        self.as_ref().extract_streaming_plan()
181    }
182
183    pub fn describe(&self) -> String {
184        self.as_ref().describe()
185    }
186
187    pub fn describe_tree_format(&self) -> String {
188        self.as_ref().describe_tree_format()
189    }
190
191    pub fn display(&self) -> format::IRDisplay {
192        self.as_ref().display()
193    }
194
195    pub fn display_dot(&self) -> dot::IRDotDisplay {
196        self.as_ref().display_dot()
197    }
198}
199
200impl<'a> IRPlanRef<'a> {
201    pub fn root(self) -> &'a IR {
202        self.lp_arena.get(self.lp_top)
203    }
204
205    pub fn with_root(self, root: Node) -> Self {
206        Self {
207            lp_top: root,
208            lp_arena: self.lp_arena,
209            expr_arena: self.expr_arena,
210        }
211    }
212
213    /// Extract the original logical plan if the plan is for the Streaming Engine
214    pub fn extract_streaming_plan(self) -> Option<IRPlanRef<'a>> {
215        // @NOTE: the streaming engine replaces the whole tree with a MapFunction { Pipeline, .. }
216        // and puts the original plan somewhere in there. This is how we extract it. Disgusting, I
217        // know.
218        let IR::MapFunction { input: _, function } = self.root() else {
219            return None;
220        };
221
222        let FunctionIR::Pipeline { original, .. } = function else {
223            return None;
224        };
225
226        Some(original.as_ref()?.as_ref().as_ref())
227    }
228
229    pub fn display(self) -> format::IRDisplay<'a> {
230        format::IRDisplay::new(self)
231    }
232
233    pub fn display_dot(self) -> dot::IRDotDisplay<'a> {
234        dot::IRDotDisplay::new(self)
235    }
236
237    pub fn describe(self) -> String {
238        self.display().to_string()
239    }
240
241    pub fn describe_tree_format(self) -> String {
242        let mut visitor = tree_format::TreeFmtVisitor::default();
243        tree_format::TreeFmtNode::root_logical_plan(self).traverse(&mut visitor);
244        format!("{visitor:#?}")
245    }
246}
247
248impl fmt::Debug for IRPlan {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        <format::IRDisplay as fmt::Display>::fmt(&self.display(), f)
251    }
252}
253
254impl fmt::Debug for IRPlanRef<'_> {
255    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256        <format::IRDisplay as fmt::Display>::fmt(&self.display(), f)
257    }
258}
259
260#[cfg(test)]
261mod test {
262    use super::*;
263
264    // skipped for now
265    #[ignore]
266    #[test]
267    fn test_alp_size() {
268        assert!(size_of::<IR>() <= 152);
269    }
270}