1use std::fmt;
2use std::fmt::Debug;
3use std::sync::{Arc, Mutex};
4
5use polars_core::prelude::*;
6use recursive::recursive;
7
8use crate::prelude::*;
9
10pub(crate) mod aexpr;
11pub(crate) mod anonymous_scan;
12pub(crate) mod ir;
13
14mod apply;
15mod builder_dsl;
16mod builder_ir;
17pub(crate) mod conversion;
18#[cfg(feature = "debugging")]
19pub(crate) mod debug;
20pub mod expr_ir;
21mod file_scan;
22mod format;
23mod functions;
24pub mod hive;
25pub(crate) mod iterator;
26mod lit;
27pub(crate) mod optimizer;
28pub(crate) mod options;
29#[cfg(feature = "python")]
30pub mod python;
31mod schema;
32pub mod visitor;
33
34pub use aexpr::*;
35pub use anonymous_scan::*;
36pub use apply::*;
37pub use builder_dsl::*;
38pub use builder_ir::*;
39pub use conversion::*;
40pub(crate) use expr_ir::*;
41pub use file_scan::*;
42pub use functions::*;
43pub use ir::*;
44pub use iterator::*;
45pub use lit::*;
46pub use optimizer::*;
47pub use schema::*;
48#[cfg(feature = "serde")]
49use serde::{Deserialize, Serialize};
50use strum_macros::IntoStaticStr;
51
52#[derive(Clone, Copy, Debug, Default)]
53pub enum Context {
54 Aggregation,
56 #[default]
58 Default,
59}
60
61#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
62pub enum DslPlan {
63 #[cfg(feature = "python")]
64 PythonScan { options: PythonOptions },
65 Filter {
67 input: Arc<DslPlan>,
68 predicate: Expr,
69 },
70 Cache { input: Arc<DslPlan>, id: usize },
72 Scan {
73 sources: ScanSources,
74 file_info: Option<FileInfo>,
76 file_options: FileScanOptions,
77 scan_type: FileScan,
78 #[cfg_attr(feature = "serde", serde(skip))]
81 cached_ir: Arc<Mutex<Option<IR>>>,
82 },
83 DataFrameScan {
86 df: Arc<DataFrame>,
87 schema: SchemaRef,
88 },
89 Select {
91 expr: Vec<Expr>,
92 input: Arc<DslPlan>,
93 options: ProjectionOptions,
94 },
95 GroupBy {
97 input: Arc<DslPlan>,
98 keys: Vec<Expr>,
99 aggs: Vec<Expr>,
100 maintain_order: bool,
101 options: Arc<GroupbyOptions>,
102 #[cfg_attr(feature = "serde", serde(skip))]
103 apply: Option<(Arc<dyn DataFrameUdf>, SchemaRef)>,
104 },
105 Join {
107 input_left: Arc<DslPlan>,
108 input_right: Arc<DslPlan>,
109 left_on: Vec<Expr>,
111 right_on: Vec<Expr>,
112 predicates: Vec<Expr>,
114 options: Arc<JoinOptions>,
115 },
116 HStack {
118 input: Arc<DslPlan>,
119 exprs: Vec<Expr>,
120 options: ProjectionOptions,
121 },
122 Distinct {
124 input: Arc<DslPlan>,
125 options: DistinctOptionsDSL,
126 },
127 Sort {
129 input: Arc<DslPlan>,
130 by_column: Vec<Expr>,
131 slice: Option<(i64, usize)>,
132 sort_options: SortMultipleOptions,
133 },
134 Slice {
136 input: Arc<DslPlan>,
137 offset: i64,
138 len: IdxSize,
139 },
140 MapFunction {
142 input: Arc<DslPlan>,
143 function: DslFunction,
144 },
145 Union {
147 inputs: Vec<DslPlan>,
148 args: UnionArgs,
149 },
150 HConcat {
152 inputs: Vec<DslPlan>,
153 options: HConcatOptions,
154 },
155 ExtContext {
157 input: Arc<DslPlan>,
158 contexts: Vec<DslPlan>,
159 },
160 Sink {
161 input: Arc<DslPlan>,
162 payload: SinkType,
163 },
164 IR {
165 dsl: Arc<DslPlan>,
167 version: u32,
168 #[cfg_attr(feature = "serde", serde(skip))]
169 node: Option<Node>,
170 },
171}
172
173impl Clone for DslPlan {
174 #[rustfmt::skip]
177 #[allow(clippy::clone_on_copy)]
178 #[recursive]
179 fn clone(&self) -> Self {
180 match self {
181 #[cfg(feature = "python")]
182 Self::PythonScan { options } => Self::PythonScan { options: options.clone() },
183 Self::Filter { input, predicate } => Self::Filter { input: input.clone(), predicate: predicate.clone() },
184 Self::Cache { input, id } => Self::Cache { input: input.clone(), id: id.clone() },
185 Self::Scan { sources, file_info, file_options, scan_type, cached_ir } => Self::Scan { sources: sources.clone(), file_info: file_info.clone(), file_options: file_options.clone(), scan_type: scan_type.clone(), cached_ir: cached_ir.clone() },
186 Self::DataFrameScan { df, schema, } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), },
187 Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() },
188 Self::GroupBy { input, keys, aggs, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() },
189 Self::Join { input_left, input_right, left_on, right_on, predicates, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone(), predicates: predicates.clone() },
190 Self::HStack { input, exprs, options } => Self::HStack { input: input.clone(), exprs: exprs.clone(), options: options.clone() },
191 Self::Distinct { input, options } => Self::Distinct { input: input.clone(), options: options.clone() },
192 Self::Sort {input,by_column, slice, sort_options } => Self::Sort { input: input.clone(), by_column: by_column.clone(), slice: slice.clone(), sort_options: sort_options.clone() },
193 Self::Slice { input, offset, len } => Self::Slice { input: input.clone(), offset: offset.clone(), len: len.clone() },
194 Self::MapFunction { input, function } => Self::MapFunction { input: input.clone(), function: function.clone() },
195 Self::Union { inputs, args} => Self::Union { inputs: inputs.clone(), args: args.clone() },
196 Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() },
197 Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() },
198 Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() },
199 Self::IR {node, dsl, version} => Self::IR {node: *node, dsl: dsl.clone(), version: *version}
200 }
201 }
202}
203
204impl Default for DslPlan {
205 fn default() -> Self {
206 let df = DataFrame::empty();
207 let schema = df.schema().clone();
208 DslPlan::DataFrameScan {
209 df: Arc::new(df),
210 schema,
211 }
212 }
213}
214
215impl DslPlan {
216 pub fn describe(&self) -> PolarsResult<String> {
217 Ok(self.clone().to_alp()?.describe())
218 }
219
220 pub fn describe_tree_format(&self) -> PolarsResult<String> {
221 Ok(self.clone().to_alp()?.describe_tree_format())
222 }
223
224 pub fn display(&self) -> PolarsResult<impl fmt::Display> {
225 struct DslPlanDisplay(IRPlan);
226 impl fmt::Display for DslPlanDisplay {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 fmt::Display::fmt(&self.0.as_ref().display(), f)
229 }
230 }
231 Ok(DslPlanDisplay(self.clone().to_alp()?))
232 }
233
234 pub fn to_alp(self) -> PolarsResult<IRPlan> {
235 let mut lp_arena = Arena::with_capacity(16);
236 let mut expr_arena = Arena::with_capacity(16);
237
238 let node = to_alp(
239 self,
240 &mut expr_arena,
241 &mut lp_arena,
242 &mut OptFlags::default(),
243 )?;
244 let plan = IRPlan::new(node, lp_arena, expr_arena);
245
246 Ok(plan)
247 }
248}