1use std::sync::Arc;
2
3use polars_core::prelude::*;
4#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
5use polars_io::cloud::CloudOptions;
6#[cfg(feature = "csv")]
7use polars_io::csv::read::CsvReadOptions;
8#[cfg(feature = "ipc")]
9use polars_io::ipc::IpcScanOptions;
10#[cfg(feature = "parquet")]
11use polars_io::parquet::read::ParquetOptions;
12use polars_io::HiveOptions;
13#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
14use polars_io::RowIndex;
15
16#[cfg(feature = "python")]
17use crate::prelude::python_udf::PythonFunction;
18use crate::prelude::*;
19
20pub struct DslBuilder(pub DslPlan);
21
22impl From<DslPlan> for DslBuilder {
23 fn from(lp: DslPlan) -> Self {
24 DslBuilder(lp)
25 }
26}
27
28impl DslBuilder {
29 pub fn anonymous_scan(
30 function: Arc<dyn AnonymousScan>,
31 schema: Option<SchemaRef>,
32 infer_schema_length: Option<usize>,
33 skip_rows: Option<usize>,
34 n_rows: Option<usize>,
35 name: &'static str,
36 ) -> PolarsResult<Self> {
37 let schema = match schema {
38 Some(s) => s,
39 None => function.schema(infer_schema_length)?,
40 };
41
42 let file_info = FileInfo::new(schema.clone(), None, (n_rows, n_rows.unwrap_or(usize::MAX)));
43 let file_options = FileScanOptions {
44 slice: n_rows.map(|x| (0, x)),
45 with_columns: None,
46 cache: false,
47 row_index: None,
48 rechunk: false,
49 file_counter: Default::default(),
50 hive_options: HiveOptions {
52 enabled: Some(false),
53 ..Default::default()
54 },
55 glob: false,
56 include_file_paths: None,
57 allow_missing_columns: false,
58 };
59
60 Ok(DslPlan::Scan {
61 sources: ScanSources::Buffers(Arc::default()),
62 file_info: Some(file_info),
63 file_options,
64 scan_type: FileScan::Anonymous {
65 function,
66 options: Arc::new(AnonymousScanOptions {
67 fmt_str: name,
68 skip_rows,
69 }),
70 },
71 cached_ir: Default::default(),
72 }
73 .into())
74 }
75
76 #[cfg(feature = "parquet")]
77 #[allow(clippy::too_many_arguments)]
78 pub fn scan_parquet(
79 sources: ScanSources,
80 n_rows: Option<usize>,
81 cache: bool,
82 parallel: polars_io::parquet::read::ParallelStrategy,
83 row_index: Option<RowIndex>,
84 rechunk: bool,
85 low_memory: bool,
86 cloud_options: Option<CloudOptions>,
87 use_statistics: bool,
88 schema: Option<SchemaRef>,
89 hive_options: HiveOptions,
90 glob: bool,
91 include_file_paths: Option<PlSmallStr>,
92 allow_missing_columns: bool,
93 ) -> PolarsResult<Self> {
94 let options = FileScanOptions {
95 with_columns: None,
96 cache,
97 slice: n_rows.map(|x| (0, x)),
98 rechunk,
99 row_index,
100 file_counter: Default::default(),
101 hive_options,
102 glob,
103 include_file_paths,
104 allow_missing_columns,
105 };
106 Ok(DslPlan::Scan {
107 sources,
108 file_info: None,
109 file_options: options,
110 scan_type: FileScan::Parquet {
111 options: ParquetOptions {
112 schema,
113 parallel,
114 low_memory,
115 use_statistics,
116 },
117 cloud_options,
118 metadata: None,
119 },
120 cached_ir: Default::default(),
121 }
122 .into())
123 }
124
125 #[cfg(feature = "ipc")]
126 #[allow(clippy::too_many_arguments)]
127 pub fn scan_ipc(
128 sources: ScanSources,
129 options: IpcScanOptions,
130 n_rows: Option<usize>,
131 cache: bool,
132 row_index: Option<RowIndex>,
133 rechunk: bool,
134 cloud_options: Option<CloudOptions>,
135 hive_options: HiveOptions,
136 include_file_paths: Option<PlSmallStr>,
137 ) -> PolarsResult<Self> {
138 Ok(DslPlan::Scan {
139 sources,
140 file_info: None,
141 file_options: FileScanOptions {
142 with_columns: None,
143 cache,
144 slice: n_rows.map(|x| (0, x)),
145 rechunk,
146 row_index,
147 file_counter: Default::default(),
148 hive_options,
149 glob: true,
150 include_file_paths,
151 allow_missing_columns: false,
152 },
153 scan_type: FileScan::Ipc {
154 options,
155 cloud_options,
156 metadata: None,
157 },
158 cached_ir: Default::default(),
159 }
160 .into())
161 }
162
163 #[allow(clippy::too_many_arguments)]
164 #[cfg(feature = "csv")]
165 pub fn scan_csv(
166 sources: ScanSources,
167 read_options: CsvReadOptions,
168 cache: bool,
169 cloud_options: Option<CloudOptions>,
170 glob: bool,
171 include_file_paths: Option<PlSmallStr>,
172 ) -> PolarsResult<Self> {
173 let read_options_clone = read_options.clone();
175
176 let options = FileScanOptions {
177 with_columns: None,
178 cache,
179 slice: read_options_clone.n_rows.map(|x| (0, x)),
180 rechunk: read_options_clone.rechunk,
181 row_index: read_options_clone.row_index,
182 file_counter: Default::default(),
183 hive_options: HiveOptions {
185 enabled: Some(false),
186 ..Default::default()
187 },
188 glob,
189 include_file_paths,
190 allow_missing_columns: false,
191 };
192 Ok(DslPlan::Scan {
193 sources,
194 file_info: None,
195 file_options: options,
196 scan_type: FileScan::Csv {
197 options: read_options,
198 cloud_options,
199 },
200 cached_ir: Default::default(),
201 }
202 .into())
203 }
204
205 pub fn cache(self) -> Self {
206 let input = Arc::new(self.0);
207 let id = input.as_ref() as *const DslPlan as usize;
208 DslPlan::Cache { input, id }.into()
209 }
210
211 pub fn drop(self, to_drop: Vec<Selector>, strict: bool) -> Self {
212 self.map_private(DslFunction::Drop(DropFunction { to_drop, strict }))
213 }
214
215 pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
216 DslPlan::Select {
217 expr: exprs,
218 input: Arc::new(self.0),
219 options,
220 }
221 .into()
222 }
223
224 pub fn fill_null(self, fill_value: Expr) -> Self {
225 self.project(
226 vec![all().fill_null(fill_value)],
227 ProjectionOptions {
228 duplicate_check: false,
229 ..Default::default()
230 },
231 )
232 }
233
234 pub fn drop_nans(self, subset: Option<Vec<Expr>>) -> Self {
235 if let Some(subset) = subset {
236 self.filter(
237 all_horizontal(
238 subset
239 .into_iter()
240 .map(|v| v.is_not_nan())
241 .collect::<Vec<_>>(),
242 )
243 .unwrap(),
244 )
245 } else {
246 self.filter(
247 all_horizontal([dtype_cols([DataType::Float32, DataType::Float64]).is_not_nan()])
249 .unwrap(),
250 )
251 }
252 }
253
254 pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> Self {
255 if let Some(subset) = subset {
256 self.filter(
257 all_horizontal(
258 subset
259 .into_iter()
260 .map(|v| v.is_not_null())
261 .collect::<Vec<_>>(),
262 )
263 .unwrap(),
264 )
265 } else {
266 self.filter(all_horizontal([all().is_not_null()]).unwrap())
267 }
268 }
269
270 pub fn fill_nan(self, fill_value: Expr) -> Self {
271 self.map_private(DslFunction::FillNan(fill_value))
272 }
273
274 pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
275 if exprs.is_empty() {
276 return self;
277 }
278
279 DslPlan::HStack {
280 input: Arc::new(self.0),
281 exprs,
282 options,
283 }
284 .into()
285 }
286
287 pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {
288 DslPlan::ExtContext {
289 input: Arc::new(self.0),
290 contexts,
291 }
292 .into()
293 }
294
295 pub fn filter(self, predicate: Expr) -> Self {
297 DslPlan::Filter {
298 predicate,
299 input: Arc::new(self.0),
300 }
301 .into()
302 }
303
304 pub fn group_by<E: AsRef<[Expr]>>(
305 self,
306 keys: Vec<Expr>,
307 aggs: E,
308 apply: Option<(Arc<dyn DataFrameUdf>, SchemaRef)>,
309 maintain_order: bool,
310 #[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,
311 #[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,
312 ) -> Self {
313 let aggs = aggs.as_ref().to_vec();
314 let options = GroupbyOptions {
315 #[cfg(feature = "dynamic_group_by")]
316 dynamic: dynamic_options,
317 #[cfg(feature = "dynamic_group_by")]
318 rolling: rolling_options,
319 slice: None,
320 };
321
322 DslPlan::GroupBy {
323 input: Arc::new(self.0),
324 keys,
325 aggs,
326 apply,
327 maintain_order,
328 options: Arc::new(options),
329 }
330 .into()
331 }
332
333 pub fn build(self) -> DslPlan {
334 self.0
335 }
336
337 pub fn from_existing_df(df: DataFrame) -> Self {
338 let schema = df.schema().clone();
339 DslPlan::DataFrameScan {
340 df: Arc::new(df),
341 schema,
342 }
343 .into()
344 }
345
346 pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {
347 DslPlan::Sort {
348 input: Arc::new(self.0),
349 by_column,
350 slice: None,
351 sort_options,
352 }
353 .into()
354 }
355
356 pub fn explode(self, columns: Vec<Selector>, allow_empty: bool) -> Self {
357 DslPlan::MapFunction {
358 input: Arc::new(self.0),
359 function: DslFunction::Explode {
360 columns,
361 allow_empty,
362 },
363 }
364 .into()
365 }
366
367 #[cfg(feature = "pivot")]
368 pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {
369 DslPlan::MapFunction {
370 input: Arc::new(self.0),
371 function: DslFunction::Unpivot { args },
372 }
373 .into()
374 }
375
376 pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {
377 DslPlan::MapFunction {
378 input: Arc::new(self.0),
379 function: DslFunction::RowIndex { name, offset },
380 }
381 .into()
382 }
383
384 pub fn distinct(self, options: DistinctOptionsDSL) -> Self {
385 DslPlan::Distinct {
386 input: Arc::new(self.0),
387 options,
388 }
389 .into()
390 }
391
392 pub fn slice(self, offset: i64, len: IdxSize) -> Self {
393 DslPlan::Slice {
394 input: Arc::new(self.0),
395 offset,
396 len,
397 }
398 .into()
399 }
400
401 pub fn join(
402 self,
403 other: DslPlan,
404 left_on: Vec<Expr>,
405 right_on: Vec<Expr>,
406 options: Arc<JoinOptions>,
407 ) -> Self {
408 DslPlan::Join {
409 input_left: Arc::new(self.0),
410 input_right: Arc::new(other),
411 left_on,
412 right_on,
413 predicates: Default::default(),
414 options,
415 }
416 .into()
417 }
418 pub fn map_private(self, function: DslFunction) -> Self {
419 DslPlan::MapFunction {
420 input: Arc::new(self.0),
421 function,
422 }
423 .into()
424 }
425
426 #[cfg(feature = "python")]
427 pub fn map_python(
428 self,
429 function: PythonFunction,
430 optimizations: AllowedOptimizations,
431 schema: Option<SchemaRef>,
432 validate_output: bool,
433 ) -> Self {
434 DslPlan::MapFunction {
435 input: Arc::new(self.0),
436 function: DslFunction::OpaquePython(OpaquePythonUdf {
437 function,
438 schema,
439 predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
440 projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
441 streamable: optimizations.contains(OptFlags::STREAMING),
442 validate_output,
443 }),
444 }
445 .into()
446 }
447
448 pub fn map<F>(
449 self,
450 function: F,
451 optimizations: AllowedOptimizations,
452 schema: Option<Arc<dyn UdfSchema>>,
453 name: PlSmallStr,
454 ) -> Self
455 where
456 F: DataFrameUdf + 'static,
457 {
458 let function = Arc::new(function);
459
460 DslPlan::MapFunction {
461 input: Arc::new(self.0),
462 function: DslFunction::FunctionIR(FunctionIR::Opaque {
463 function,
464 schema,
465 predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
466 projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
467 streamable: optimizations.contains(OptFlags::STREAMING),
468 fmt_str: name,
469 }),
470 }
471 .into()
472 }
473}