polars_plan/plans/
mod.rs

1use std::fmt;
2use std::fmt::Debug;
3use std::sync::{Arc, Mutex};
4
5use polars_core::prelude::*;
6use recursive::recursive;
7
8use crate::prelude::*;
9
10pub(crate) mod aexpr;
11pub(crate) mod anonymous_scan;
12pub(crate) mod ir;
13
14mod apply;
15mod builder_dsl;
16mod builder_ir;
17pub(crate) mod conversion;
18#[cfg(feature = "debugging")]
19pub(crate) mod debug;
20pub mod expr_ir;
21mod file_scan;
22mod format;
23mod functions;
24pub mod hive;
25pub(crate) mod iterator;
26mod lit;
27pub(crate) mod optimizer;
28pub(crate) mod options;
29#[cfg(feature = "python")]
30pub mod python;
31mod schema;
32pub mod visitor;
33
34pub use aexpr::*;
35pub use anonymous_scan::*;
36pub use apply::*;
37pub use builder_dsl::*;
38pub use builder_ir::*;
39pub use conversion::*;
40pub(crate) use expr_ir::*;
41pub use file_scan::*;
42pub use functions::*;
43pub use ir::*;
44pub use iterator::*;
45pub use lit::*;
46pub use optimizer::*;
47pub use schema::*;
48#[cfg(feature = "serde")]
49use serde::{Deserialize, Serialize};
50use strum_macros::IntoStaticStr;
51
52#[derive(Clone, Copy, Debug, Default)]
53pub enum Context {
54    /// Any operation that is done on groups
55    Aggregation,
56    /// Any operation that is done while projection/ selection of data
57    #[default]
58    Default,
59}
60
61#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
62pub enum DslPlan {
63    #[cfg(feature = "python")]
64    PythonScan { options: PythonOptions },
65    /// Filter on a boolean mask
66    Filter {
67        input: Arc<DslPlan>,
68        predicate: Expr,
69    },
70    /// Cache the input at this point in the LP
71    Cache { input: Arc<DslPlan>, id: usize },
72    Scan {
73        sources: ScanSources,
74        /// Materialized at IR except for AnonymousScan.
75        file_info: Option<FileInfo>,
76        file_options: FileScanOptions,
77        scan_type: FileScan,
78        /// Local use cases often repeatedly collect the same `LazyFrame` (e.g. in interactive notebook use-cases),
79        /// so we cache the IR conversion here, as the path expansion can be quite slow (especially for cloud paths).
80        #[cfg_attr(feature = "serde", serde(skip))]
81        cached_ir: Arc<Mutex<Option<IR>>>,
82    },
83    // we keep track of the projection and selection as it is cheaper to first project and then filter
84    /// In memory DataFrame
85    DataFrameScan {
86        df: Arc<DataFrame>,
87        schema: SchemaRef,
88    },
89    /// Polars' `select` operation, this can mean projection, but also full data access.
90    Select {
91        expr: Vec<Expr>,
92        input: Arc<DslPlan>,
93        options: ProjectionOptions,
94    },
95    /// Groupby aggregation
96    GroupBy {
97        input: Arc<DslPlan>,
98        keys: Vec<Expr>,
99        aggs: Vec<Expr>,
100        maintain_order: bool,
101        options: Arc<GroupbyOptions>,
102        #[cfg_attr(feature = "serde", serde(skip))]
103        apply: Option<(Arc<dyn DataFrameUdf>, SchemaRef)>,
104    },
105    /// Join operation
106    Join {
107        input_left: Arc<DslPlan>,
108        input_right: Arc<DslPlan>,
109        // Invariant: left_on and right_on are equal length.
110        left_on: Vec<Expr>,
111        right_on: Vec<Expr>,
112        // Invariant: Either left_on/right_on or predicates is set (non-empty).
113        predicates: Vec<Expr>,
114        options: Arc<JoinOptions>,
115    },
116    /// Adding columns to the table without a Join
117    HStack {
118        input: Arc<DslPlan>,
119        exprs: Vec<Expr>,
120        options: ProjectionOptions,
121    },
122    /// Remove duplicates from the table
123    Distinct {
124        input: Arc<DslPlan>,
125        options: DistinctOptionsDSL,
126    },
127    /// Sort the table
128    Sort {
129        input: Arc<DslPlan>,
130        by_column: Vec<Expr>,
131        slice: Option<(i64, usize)>,
132        sort_options: SortMultipleOptions,
133    },
134    /// Slice the table
135    Slice {
136        input: Arc<DslPlan>,
137        offset: i64,
138        len: IdxSize,
139    },
140    /// A (User Defined) Function
141    MapFunction {
142        input: Arc<DslPlan>,
143        function: DslFunction,
144    },
145    /// Vertical concatenation
146    Union {
147        inputs: Vec<DslPlan>,
148        args: UnionArgs,
149    },
150    /// Horizontal concatenation of multiple plans
151    HConcat {
152        inputs: Vec<DslPlan>,
153        options: HConcatOptions,
154    },
155    /// This allows expressions to access other tables
156    ExtContext {
157        input: Arc<DslPlan>,
158        contexts: Vec<DslPlan>,
159    },
160    Sink {
161        input: Arc<DslPlan>,
162        payload: SinkType,
163    },
164    IR {
165        // Keep the original Dsl around as we need that for serialization.
166        dsl: Arc<DslPlan>,
167        version: u32,
168        #[cfg_attr(feature = "serde", serde(skip))]
169        node: Option<Node>,
170    },
171}
172
173impl Clone for DslPlan {
174    // Autogenerated by rust-analyzer, don't care about it looking nice, it just
175    // calls clone on every member of every enum variant.
176    #[rustfmt::skip]
177    #[allow(clippy::clone_on_copy)]
178    #[recursive]
179    fn clone(&self) -> Self {
180        match self {
181            #[cfg(feature = "python")]
182            Self::PythonScan { options } => Self::PythonScan { options: options.clone() },
183            Self::Filter { input, predicate } => Self::Filter { input: input.clone(), predicate: predicate.clone() },
184            Self::Cache { input, id } => Self::Cache { input: input.clone(), id: id.clone() },
185            Self::Scan { sources, file_info, file_options, scan_type, cached_ir } => Self::Scan { sources: sources.clone(), file_info: file_info.clone(), file_options: file_options.clone(), scan_type: scan_type.clone(), cached_ir: cached_ir.clone() },
186            Self::DataFrameScan { df, schema, } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(),  },
187            Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() },
188            Self::GroupBy { input, keys, aggs,  apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() },
189            Self::Join { input_left, input_right, left_on, right_on, predicates, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone(), predicates: predicates.clone() },
190            Self::HStack { input, exprs, options } => Self::HStack { input: input.clone(), exprs: exprs.clone(),  options: options.clone() },
191            Self::Distinct { input, options } => Self::Distinct { input: input.clone(), options: options.clone() },
192            Self::Sort {input,by_column, slice, sort_options } => Self::Sort { input: input.clone(), by_column: by_column.clone(), slice: slice.clone(), sort_options: sort_options.clone() },
193            Self::Slice { input, offset, len } => Self::Slice { input: input.clone(), offset: offset.clone(), len: len.clone() },
194            Self::MapFunction { input, function } => Self::MapFunction { input: input.clone(), function: function.clone() },
195            Self::Union { inputs, args} => Self::Union { inputs: inputs.clone(), args: args.clone() },
196            Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() },
197            Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() },
198            Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() },
199            Self::IR {node, dsl, version} => Self::IR {node: *node, dsl: dsl.clone(), version: *version}
200        }
201    }
202}
203
204impl Default for DslPlan {
205    fn default() -> Self {
206        let df = DataFrame::empty();
207        let schema = df.schema().clone();
208        DslPlan::DataFrameScan {
209            df: Arc::new(df),
210            schema,
211        }
212    }
213}
214
215impl DslPlan {
216    pub fn describe(&self) -> PolarsResult<String> {
217        Ok(self.clone().to_alp()?.describe())
218    }
219
220    pub fn describe_tree_format(&self) -> PolarsResult<String> {
221        Ok(self.clone().to_alp()?.describe_tree_format())
222    }
223
224    pub fn display(&self) -> PolarsResult<impl fmt::Display> {
225        struct DslPlanDisplay(IRPlan);
226        impl fmt::Display for DslPlanDisplay {
227            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228                fmt::Display::fmt(&self.0.as_ref().display(), f)
229            }
230        }
231        Ok(DslPlanDisplay(self.clone().to_alp()?))
232    }
233
234    pub fn to_alp(self) -> PolarsResult<IRPlan> {
235        let mut lp_arena = Arena::with_capacity(16);
236        let mut expr_arena = Arena::with_capacity(16);
237
238        let node = to_alp(
239            self,
240            &mut expr_arena,
241            &mut lp_arena,
242            &mut OptFlags::default(),
243        )?;
244        let plan = IRPlan::new(node, lp_arena, expr_arena);
245
246        Ok(plan)
247    }
248}