polars_plan/plans/
builder_dsl.rs

1use std::sync::Arc;
2
3use polars_core::prelude::*;
4#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
5use polars_io::cloud::CloudOptions;
6#[cfg(feature = "csv")]
7use polars_io::csv::read::CsvReadOptions;
8#[cfg(feature = "ipc")]
9use polars_io::ipc::IpcScanOptions;
10#[cfg(feature = "parquet")]
11use polars_io::parquet::read::ParquetOptions;
12use polars_io::HiveOptions;
13#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
14use polars_io::RowIndex;
15
16#[cfg(feature = "python")]
17use crate::prelude::python_udf::PythonFunction;
18use crate::prelude::*;
19
20pub struct DslBuilder(pub DslPlan);
21
22impl From<DslPlan> for DslBuilder {
23    fn from(lp: DslPlan) -> Self {
24        DslBuilder(lp)
25    }
26}
27
28impl DslBuilder {
29    pub fn anonymous_scan(
30        function: Arc<dyn AnonymousScan>,
31        schema: Option<SchemaRef>,
32        infer_schema_length: Option<usize>,
33        skip_rows: Option<usize>,
34        n_rows: Option<usize>,
35        name: &'static str,
36    ) -> PolarsResult<Self> {
37        let schema = match schema {
38            Some(s) => s,
39            None => function.schema(infer_schema_length)?,
40        };
41
42        let file_info = FileInfo::new(schema.clone(), None, (n_rows, n_rows.unwrap_or(usize::MAX)));
43        let file_options = FileScanOptions {
44            slice: n_rows.map(|x| (0, x)),
45            with_columns: None,
46            cache: false,
47            row_index: None,
48            rechunk: false,
49            file_counter: Default::default(),
50            // TODO: Support Hive partitioning.
51            hive_options: HiveOptions {
52                enabled: Some(false),
53                ..Default::default()
54            },
55            glob: false,
56            include_file_paths: None,
57            allow_missing_columns: false,
58        };
59
60        Ok(DslPlan::Scan {
61            sources: ScanSources::Buffers(Arc::default()),
62            file_info: Some(file_info),
63            file_options,
64            scan_type: FileScan::Anonymous {
65                function,
66                options: Arc::new(AnonymousScanOptions {
67                    fmt_str: name,
68                    skip_rows,
69                }),
70            },
71            cached_ir: Default::default(),
72        }
73        .into())
74    }
75
76    #[cfg(feature = "parquet")]
77    #[allow(clippy::too_many_arguments)]
78    pub fn scan_parquet(
79        sources: ScanSources,
80        n_rows: Option<usize>,
81        cache: bool,
82        parallel: polars_io::parquet::read::ParallelStrategy,
83        row_index: Option<RowIndex>,
84        rechunk: bool,
85        low_memory: bool,
86        cloud_options: Option<CloudOptions>,
87        use_statistics: bool,
88        schema: Option<SchemaRef>,
89        hive_options: HiveOptions,
90        glob: bool,
91        include_file_paths: Option<PlSmallStr>,
92        allow_missing_columns: bool,
93    ) -> PolarsResult<Self> {
94        let options = FileScanOptions {
95            with_columns: None,
96            cache,
97            slice: n_rows.map(|x| (0, x)),
98            rechunk,
99            row_index,
100            file_counter: Default::default(),
101            hive_options,
102            glob,
103            include_file_paths,
104            allow_missing_columns,
105        };
106        Ok(DslPlan::Scan {
107            sources,
108            file_info: None,
109            file_options: options,
110            scan_type: FileScan::Parquet {
111                options: ParquetOptions {
112                    schema,
113                    parallel,
114                    low_memory,
115                    use_statistics,
116                },
117                cloud_options,
118                metadata: None,
119            },
120            cached_ir: Default::default(),
121        }
122        .into())
123    }
124
125    #[cfg(feature = "ipc")]
126    #[allow(clippy::too_many_arguments)]
127    pub fn scan_ipc(
128        sources: ScanSources,
129        options: IpcScanOptions,
130        n_rows: Option<usize>,
131        cache: bool,
132        row_index: Option<RowIndex>,
133        rechunk: bool,
134        cloud_options: Option<CloudOptions>,
135        hive_options: HiveOptions,
136        include_file_paths: Option<PlSmallStr>,
137    ) -> PolarsResult<Self> {
138        Ok(DslPlan::Scan {
139            sources,
140            file_info: None,
141            file_options: FileScanOptions {
142                with_columns: None,
143                cache,
144                slice: n_rows.map(|x| (0, x)),
145                rechunk,
146                row_index,
147                file_counter: Default::default(),
148                hive_options,
149                glob: true,
150                include_file_paths,
151                allow_missing_columns: false,
152            },
153            scan_type: FileScan::Ipc {
154                options,
155                cloud_options,
156                metadata: None,
157            },
158            cached_ir: Default::default(),
159        }
160        .into())
161    }
162
163    #[allow(clippy::too_many_arguments)]
164    #[cfg(feature = "csv")]
165    pub fn scan_csv(
166        sources: ScanSources,
167        read_options: CsvReadOptions,
168        cache: bool,
169        cloud_options: Option<CloudOptions>,
170        glob: bool,
171        include_file_paths: Option<PlSmallStr>,
172    ) -> PolarsResult<Self> {
173        // This gets partially moved by FileScanOptions
174        let read_options_clone = read_options.clone();
175
176        let options = FileScanOptions {
177            with_columns: None,
178            cache,
179            slice: read_options_clone.n_rows.map(|x| (0, x)),
180            rechunk: read_options_clone.rechunk,
181            row_index: read_options_clone.row_index,
182            file_counter: Default::default(),
183            // TODO: Support Hive partitioning.
184            hive_options: HiveOptions {
185                enabled: Some(false),
186                ..Default::default()
187            },
188            glob,
189            include_file_paths,
190            allow_missing_columns: false,
191        };
192        Ok(DslPlan::Scan {
193            sources,
194            file_info: None,
195            file_options: options,
196            scan_type: FileScan::Csv {
197                options: read_options,
198                cloud_options,
199            },
200            cached_ir: Default::default(),
201        }
202        .into())
203    }
204
205    pub fn cache(self) -> Self {
206        let input = Arc::new(self.0);
207        let id = input.as_ref() as *const DslPlan as usize;
208        DslPlan::Cache { input, id }.into()
209    }
210
211    pub fn drop(self, to_drop: Vec<Selector>, strict: bool) -> Self {
212        self.map_private(DslFunction::Drop(DropFunction { to_drop, strict }))
213    }
214
215    pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
216        DslPlan::Select {
217            expr: exprs,
218            input: Arc::new(self.0),
219            options,
220        }
221        .into()
222    }
223
224    pub fn fill_null(self, fill_value: Expr) -> Self {
225        self.project(
226            vec![all().fill_null(fill_value)],
227            ProjectionOptions {
228                duplicate_check: false,
229                ..Default::default()
230            },
231        )
232    }
233
234    pub fn drop_nans(self, subset: Option<Vec<Expr>>) -> Self {
235        if let Some(subset) = subset {
236            self.filter(
237                all_horizontal(
238                    subset
239                        .into_iter()
240                        .map(|v| v.is_not_nan())
241                        .collect::<Vec<_>>(),
242                )
243                .unwrap(),
244            )
245        } else {
246            self.filter(
247                // TODO: when Decimal supports NaN, include here
248                all_horizontal([dtype_cols([DataType::Float32, DataType::Float64]).is_not_nan()])
249                    .unwrap(),
250            )
251        }
252    }
253
254    pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> Self {
255        if let Some(subset) = subset {
256            self.filter(
257                all_horizontal(
258                    subset
259                        .into_iter()
260                        .map(|v| v.is_not_null())
261                        .collect::<Vec<_>>(),
262                )
263                .unwrap(),
264            )
265        } else {
266            self.filter(all_horizontal([all().is_not_null()]).unwrap())
267        }
268    }
269
270    pub fn fill_nan(self, fill_value: Expr) -> Self {
271        self.map_private(DslFunction::FillNan(fill_value))
272    }
273
274    pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
275        if exprs.is_empty() {
276            return self;
277        }
278
279        DslPlan::HStack {
280            input: Arc::new(self.0),
281            exprs,
282            options,
283        }
284        .into()
285    }
286
287    pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {
288        DslPlan::ExtContext {
289            input: Arc::new(self.0),
290            contexts,
291        }
292        .into()
293    }
294
295    /// Apply a filter
296    pub fn filter(self, predicate: Expr) -> Self {
297        DslPlan::Filter {
298            predicate,
299            input: Arc::new(self.0),
300        }
301        .into()
302    }
303
304    pub fn group_by<E: AsRef<[Expr]>>(
305        self,
306        keys: Vec<Expr>,
307        aggs: E,
308        apply: Option<(Arc<dyn DataFrameUdf>, SchemaRef)>,
309        maintain_order: bool,
310        #[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,
311        #[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,
312    ) -> Self {
313        let aggs = aggs.as_ref().to_vec();
314        let options = GroupbyOptions {
315            #[cfg(feature = "dynamic_group_by")]
316            dynamic: dynamic_options,
317            #[cfg(feature = "dynamic_group_by")]
318            rolling: rolling_options,
319            slice: None,
320        };
321
322        DslPlan::GroupBy {
323            input: Arc::new(self.0),
324            keys,
325            aggs,
326            apply,
327            maintain_order,
328            options: Arc::new(options),
329        }
330        .into()
331    }
332
333    pub fn build(self) -> DslPlan {
334        self.0
335    }
336
337    pub fn from_existing_df(df: DataFrame) -> Self {
338        let schema = df.schema().clone();
339        DslPlan::DataFrameScan {
340            df: Arc::new(df),
341            schema,
342        }
343        .into()
344    }
345
346    pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {
347        DslPlan::Sort {
348            input: Arc::new(self.0),
349            by_column,
350            slice: None,
351            sort_options,
352        }
353        .into()
354    }
355
356    pub fn explode(self, columns: Vec<Selector>, allow_empty: bool) -> Self {
357        DslPlan::MapFunction {
358            input: Arc::new(self.0),
359            function: DslFunction::Explode {
360                columns,
361                allow_empty,
362            },
363        }
364        .into()
365    }
366
367    #[cfg(feature = "pivot")]
368    pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {
369        DslPlan::MapFunction {
370            input: Arc::new(self.0),
371            function: DslFunction::Unpivot { args },
372        }
373        .into()
374    }
375
376    pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {
377        DslPlan::MapFunction {
378            input: Arc::new(self.0),
379            function: DslFunction::RowIndex { name, offset },
380        }
381        .into()
382    }
383
384    pub fn distinct(self, options: DistinctOptionsDSL) -> Self {
385        DslPlan::Distinct {
386            input: Arc::new(self.0),
387            options,
388        }
389        .into()
390    }
391
392    pub fn slice(self, offset: i64, len: IdxSize) -> Self {
393        DslPlan::Slice {
394            input: Arc::new(self.0),
395            offset,
396            len,
397        }
398        .into()
399    }
400
401    pub fn join(
402        self,
403        other: DslPlan,
404        left_on: Vec<Expr>,
405        right_on: Vec<Expr>,
406        options: Arc<JoinOptions>,
407    ) -> Self {
408        DslPlan::Join {
409            input_left: Arc::new(self.0),
410            input_right: Arc::new(other),
411            left_on,
412            right_on,
413            predicates: Default::default(),
414            options,
415        }
416        .into()
417    }
418    pub fn map_private(self, function: DslFunction) -> Self {
419        DslPlan::MapFunction {
420            input: Arc::new(self.0),
421            function,
422        }
423        .into()
424    }
425
426    #[cfg(feature = "python")]
427    pub fn map_python(
428        self,
429        function: PythonFunction,
430        optimizations: AllowedOptimizations,
431        schema: Option<SchemaRef>,
432        validate_output: bool,
433    ) -> Self {
434        DslPlan::MapFunction {
435            input: Arc::new(self.0),
436            function: DslFunction::OpaquePython(OpaquePythonUdf {
437                function,
438                schema,
439                predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
440                projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
441                streamable: optimizations.contains(OptFlags::STREAMING),
442                validate_output,
443            }),
444        }
445        .into()
446    }
447
448    pub fn map<F>(
449        self,
450        function: F,
451        optimizations: AllowedOptimizations,
452        schema: Option<Arc<dyn UdfSchema>>,
453        name: PlSmallStr,
454    ) -> Self
455    where
456        F: DataFrameUdf + 'static,
457    {
458        let function = Arc::new(function);
459
460        DslPlan::MapFunction {
461            input: Arc::new(self.0),
462            function: DslFunction::FunctionIR(FunctionIR::Opaque {
463                function,
464                schema,
465                predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
466                projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
467                streamable: optimizations.contains(OptFlags::STREAMING),
468                fmt_str: name,
469            }),
470        }
471        .into()
472    }
473}