polars_plan/plans/
options.rs

1#[cfg(feature = "json")]
2use std::num::NonZeroUsize;
3use std::path::PathBuf;
4
5use bitflags::bitflags;
6use polars_core::prelude::*;
7use polars_core::utils::SuperTypeOptions;
8#[cfg(feature = "csv")]
9use polars_io::csv::write::CsvWriterOptions;
10#[cfg(feature = "ipc")]
11use polars_io::ipc::IpcWriterOptions;
12#[cfg(feature = "json")]
13use polars_io::json::JsonWriterOptions;
14#[cfg(feature = "parquet")]
15use polars_io::parquet::write::ParquetWriteOptions;
16use polars_io::{is_cloud_url, HiveOptions, RowIndex};
17#[cfg(feature = "dynamic_group_by")]
18use polars_time::{DynamicGroupOptions, RollingGroupOptions};
19#[cfg(feature = "serde")]
20use serde::{Deserialize, Serialize};
21
22use crate::dsl::Selector;
23use crate::plans::{ExprIR, PlSmallStr};
24#[cfg(feature = "python")]
25use crate::prelude::python_udf::PythonFunction;
26
27pub type FileCount = u32;
28
29#[derive(Clone, Debug, PartialEq, Eq, Default, Hash)]
30#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
31/// Generic options for all file types.
32pub struct FileScanOptions {
33    pub slice: Option<(i64, usize)>,
34    pub with_columns: Option<Arc<[PlSmallStr]>>,
35    pub cache: bool,
36    pub row_index: Option<RowIndex>,
37    pub rechunk: bool,
38    pub file_counter: FileCount,
39    pub hive_options: HiveOptions,
40    pub glob: bool,
41    pub include_file_paths: Option<PlSmallStr>,
42    pub allow_missing_columns: bool,
43}
44
45#[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)]
46#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
47pub struct UnionOptions {
48    pub slice: Option<(i64, usize)>,
49    // known row_output, estimated row output
50    pub rows: (Option<usize>, usize),
51    pub parallel: bool,
52    pub from_partitioned_ds: bool,
53    pub flattened_by_opt: bool,
54    pub rechunk: bool,
55    pub maintain_order: bool,
56}
57
58#[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)]
59#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
60pub struct HConcatOptions {
61    pub parallel: bool,
62}
63
64#[derive(Clone, Debug, PartialEq, Eq, Default, Hash)]
65#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
66pub struct GroupbyOptions {
67    #[cfg(feature = "dynamic_group_by")]
68    pub dynamic: Option<DynamicGroupOptions>,
69    #[cfg(feature = "dynamic_group_by")]
70    pub rolling: Option<RollingGroupOptions>,
71    /// Take only a slice of the result
72    pub slice: Option<(i64, usize)>,
73}
74
75impl GroupbyOptions {
76    pub(crate) fn is_rolling(&self) -> bool {
77        #[cfg(feature = "dynamic_group_by")]
78        {
79            self.rolling.is_some()
80        }
81        #[cfg(not(feature = "dynamic_group_by"))]
82        {
83            false
84        }
85    }
86
87    pub(crate) fn is_dynamic(&self) -> bool {
88        #[cfg(feature = "dynamic_group_by")]
89        {
90            self.dynamic.is_some()
91        }
92        #[cfg(not(feature = "dynamic_group_by"))]
93        {
94            false
95        }
96    }
97}
98
99#[derive(Clone, Debug, Eq, PartialEq, Default, Hash)]
100#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
101pub struct DistinctOptionsDSL {
102    /// Subset of columns that will be taken into account.
103    pub subset: Option<Vec<Selector>>,
104    /// This will maintain the order of the input.
105    /// Note that this is more expensive.
106    /// `maintain_order` is not supported in the streaming
107    /// engine.
108    pub maintain_order: bool,
109    /// Which rows to keep.
110    pub keep_strategy: UniqueKeepStrategy,
111}
112
113#[derive(Clone, Debug, Eq, PartialEq, Hash)]
114#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
115pub struct DistinctOptionsIR {
116    /// Subset of columns that will be taken into account.
117    pub subset: Option<Arc<[PlSmallStr]>>,
118    /// This will maintain the order of the input.
119    /// Note that this is more expensive.
120    /// `maintain_order` is not supported in the streaming
121    /// engine.
122    pub maintain_order: bool,
123    /// Which rows to keep.
124    pub keep_strategy: UniqueKeepStrategy,
125    /// Take only a slice of the result
126    pub slice: Option<(i64, usize)>,
127}
128
129#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
130#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
131pub enum ApplyOptions {
132    /// Collect groups to a list and apply the function over the groups.
133    /// This can be important in aggregation context.
134    /// e.g. [g1, g1, g2] -> [[g1, g1], g2]
135    GroupWise,
136    /// collect groups to a list and then apply
137    /// e.g. [g1, g1, g2] -> list([g1, g1, g2])
138    ApplyList,
139    /// do not collect before apply
140    /// e.g. [g1, g1, g2] -> [g1, g1, g2]
141    ElementWise,
142}
143
144// a boolean that can only be set to `false` safely
145#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
146#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
147pub struct UnsafeBool(bool);
148impl Default for UnsafeBool {
149    fn default() -> Self {
150        UnsafeBool(true)
151    }
152}
153
154bitflags!(
155        #[repr(transparent)]
156        #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
157        #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
158        pub struct FunctionFlags: u8 {
159            // Raise if use in group by
160            const ALLOW_GROUP_AWARE = 1 << 0;
161            // For example a `unique` or a `slice`
162            const CHANGES_LENGTH = 1 << 1;
163            // The physical expression may rename the output of this function.
164            // If set to `false` the physical engine will ensure the left input
165            // expression is the output name.
166            const ALLOW_RENAME = 1 << 2;
167            // if set, then the `Series` passed to the function in the group_by operation
168            // will ensure the name is set. This is an extra heap allocation per group.
169            const PASS_NAME_TO_APPLY = 1 << 3;
170            /// There can be two ways of expanding wildcards:
171            ///
172            /// Say the schema is 'a', 'b' and there is a function `f`. In this case, `f('*')` can expand
173            /// to:
174            /// 1. `f('a', 'b')`
175            /// 2. `f('a'), f('b')`
176            ///
177            /// Setting this to true, will lead to behavior 1.
178            ///
179            /// This also accounts for regex expansion.
180            const INPUT_WILDCARD_EXPANSION = 1 << 4;
181            /// Automatically explode on unit length if it ran as final aggregation.
182            ///
183            /// this is the case for aggregations like sum, min, covariance etc.
184            /// We need to know this because we cannot see the difference between
185            /// the following functions based on the output type and number of elements:
186            ///
187            /// x: {1, 2, 3}
188            ///
189            /// head_1(x) -> {1}
190            /// sum(x) -> {4}
191            const RETURNS_SCALAR = 1 << 5;
192            /// This can happen with UDF's that use Polars within the UDF.
193            /// This can lead to recursively entering the engine and sometimes deadlocks.
194            /// This flag must be set to handle that.
195            const OPTIONAL_RE_ENTRANT = 1 << 6;
196            /// Whether this function allows no inputs.
197            const ALLOW_EMPTY_INPUTS = 1 << 7;
198        }
199);
200
201impl Default for FunctionFlags {
202    fn default() -> Self {
203        Self::from_bits_truncate(0) | Self::ALLOW_GROUP_AWARE
204    }
205}
206
207#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
208pub enum CastingRules {
209    /// Whether information may be lost during cast. E.g. a float to int is considered lossy,
210    /// whereas int to int is considered lossless.
211    /// Overflowing is not considered in this flag, that's handled in `strict` casting
212    FirstArgLossless,
213    Supertype(SuperTypeOptions),
214}
215
216impl CastingRules {
217    pub fn cast_to_supertypes() -> CastingRules {
218        Self::Supertype(Default::default())
219    }
220}
221
222#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
223#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
224pub struct FunctionOptions {
225    /// Collect groups to a list and apply the function over the groups.
226    /// This can be important in aggregation context.
227    pub collect_groups: ApplyOptions,
228
229    // Validate the output of a `map`.
230    // this should always be true or we could OOB
231    pub check_lengths: UnsafeBool,
232    pub flags: FunctionFlags,
233
234    // used for formatting, (only for anonymous functions)
235    #[cfg_attr(feature = "serde", serde(skip))]
236    pub fmt_str: &'static str,
237    /// Options used when deciding how to cast the arguments of the function.
238    #[cfg_attr(feature = "serde", serde(skip))]
239    pub cast_options: Option<CastingRules>,
240}
241
242impl FunctionOptions {
243    #[cfg(feature = "fused")]
244    pub(crate) unsafe fn no_check_lengths(&mut self) {
245        self.check_lengths = UnsafeBool(false);
246    }
247    pub fn check_lengths(&self) -> bool {
248        self.check_lengths.0
249    }
250
251    pub fn is_elementwise(&self) -> bool {
252        matches!(
253            self.collect_groups,
254            ApplyOptions::ElementWise | ApplyOptions::ApplyList
255        ) && !self.flags.contains(FunctionFlags::CHANGES_LENGTH)
256            && !self.flags.contains(FunctionFlags::RETURNS_SCALAR)
257    }
258}
259
260impl Default for FunctionOptions {
261    fn default() -> Self {
262        FunctionOptions {
263            collect_groups: ApplyOptions::GroupWise,
264            check_lengths: UnsafeBool(true),
265            fmt_str: Default::default(),
266            cast_options: Default::default(),
267            flags: Default::default(),
268        }
269    }
270}
271
272#[derive(Clone, Copy, PartialEq, Eq, Debug)]
273pub struct LogicalPlanUdfOptions {
274    ///  allow predicate pushdown optimizations
275    pub predicate_pd: bool,
276    ///  allow projection pushdown optimizations
277    pub projection_pd: bool,
278    // used for formatting
279    pub fmt_str: &'static str,
280}
281
282#[derive(Clone, PartialEq, Eq, Debug, Default)]
283#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
284#[cfg(feature = "python")]
285pub struct PythonOptions {
286    /// A function that returns a Python Generator.
287    /// The generator should produce Polars DataFrame's.
288    pub scan_fn: Option<PythonFunction>,
289    /// Schema of the file.
290    pub schema: SchemaRef,
291    /// Schema the reader will produce when the file is read.
292    pub output_schema: Option<SchemaRef>,
293    // Projected column names.
294    pub with_columns: Option<Arc<[PlSmallStr]>>,
295    // Which interface is the python function.
296    pub python_source: PythonScanSource,
297    /// A `head` call passed to the reader.
298    pub n_rows: Option<usize>,
299    /// Optional predicate the reader must apply.
300    pub predicate: PythonPredicate,
301}
302
303#[derive(Clone, PartialEq, Eq, Debug, Default)]
304#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
305pub enum PythonScanSource {
306    Pyarrow,
307    Cuda,
308    #[default]
309    IOPlugin,
310}
311
312#[derive(Clone, PartialEq, Eq, Debug, Default)]
313#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
314pub enum PythonPredicate {
315    // A pyarrow predicate python expression
316    // can be evaluated with python.eval
317    PyArrow(String),
318    Polars(ExprIR),
319    #[default]
320    None,
321}
322
323#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
324#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
325pub struct AnonymousScanOptions {
326    pub skip_rows: Option<usize>,
327    pub fmt_str: &'static str,
328}
329
330#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
331#[derive(Clone, Debug, PartialEq, Eq, Hash)]
332pub enum SinkType {
333    Memory,
334    File {
335        path: Arc<PathBuf>,
336        file_type: FileType,
337        cloud_options: Option<polars_io::cloud::CloudOptions>,
338    },
339}
340
341impl SinkType {
342    pub(crate) fn is_cloud_destination(&self) -> bool {
343        if let Self::File { path, .. } = self {
344            if is_cloud_url(path.as_ref()) {
345                return true;
346            }
347        }
348
349        false
350    }
351}
352
353#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
354#[derive(Clone, Debug)]
355pub struct FileSinkOptions {
356    pub path: Arc<PathBuf>,
357    pub file_type: FileType,
358}
359
360#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
361#[derive(Clone, Debug, PartialEq, Eq, Hash)]
362pub enum FileType {
363    #[cfg(feature = "parquet")]
364    Parquet(ParquetWriteOptions),
365    #[cfg(feature = "ipc")]
366    Ipc(IpcWriterOptions),
367    #[cfg(feature = "csv")]
368    Csv(CsvWriterOptions),
369    #[cfg(feature = "json")]
370    Json(JsonWriterOptions),
371}
372
373#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
374#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
375pub struct ProjectionOptions {
376    pub run_parallel: bool,
377    pub duplicate_check: bool,
378    // Should length-1 Series be broadcast to the length of the dataframe.
379    // Only used by CSE optimizer
380    pub should_broadcast: bool,
381}
382
383impl Default for ProjectionOptions {
384    fn default() -> Self {
385        Self {
386            run_parallel: true,
387            duplicate_check: true,
388            should_broadcast: true,
389        }
390    }
391}
392
393impl ProjectionOptions {
394    /// Conservatively merge the options of two [`ProjectionOptions`]
395    pub fn merge_options(&self, other: &Self) -> Self {
396        Self {
397            run_parallel: self.run_parallel & other.run_parallel,
398            duplicate_check: self.duplicate_check & other.duplicate_check,
399            should_broadcast: self.should_broadcast | other.should_broadcast,
400        }
401    }
402}
403
404// Arguments given to `concat`. Differs from `UnionOptions` as the latter is IR state.
405#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
406#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
407pub struct UnionArgs {
408    pub parallel: bool,
409    pub rechunk: bool,
410    pub to_supertypes: bool,
411    pub diagonal: bool,
412    // If it is a union from a scan over multiple files.
413    pub from_partitioned_ds: bool,
414    pub maintain_order: bool,
415}
416
417impl Default for UnionArgs {
418    fn default() -> Self {
419        Self {
420            parallel: true,
421            rechunk: false,
422            to_supertypes: false,
423            diagonal: false,
424            from_partitioned_ds: false,
425            maintain_order: true,
426        }
427    }
428}
429
430impl From<UnionArgs> for UnionOptions {
431    fn from(args: UnionArgs) -> Self {
432        UnionOptions {
433            slice: None,
434            parallel: args.parallel,
435            rows: (None, 0),
436            from_partitioned_ds: args.from_partitioned_ds,
437            flattened_by_opt: false,
438            rechunk: args.rechunk,
439            maintain_order: args.maintain_order,
440        }
441    }
442}
443
444#[derive(Clone, Debug, PartialEq, Eq, Hash)]
445#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
446#[cfg(feature = "json")]
447pub struct NDJsonReadOptions {
448    pub n_threads: Option<usize>,
449    pub infer_schema_length: Option<NonZeroUsize>,
450    pub chunk_size: NonZeroUsize,
451    pub low_memory: bool,
452    pub ignore_errors: bool,
453    pub schema: Option<SchemaRef>,
454    pub schema_overwrite: Option<SchemaRef>,
455}