datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use std::any::Any;
21use std::collections::{BTreeMap, HashMap};
22use std::error::Error;
23use std::fmt::{self, Display};
24use std::str::FromStr;
25
26use crate::error::_config_err;
27use crate::parsers::CompressionTypeVariant;
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30
31/// A macro that wraps a configuration struct and automatically derives
32/// [`Default`] and [`ConfigField`] for it, allowing it to be used
33/// in the [`ConfigOptions`] configuration tree.
34///
35/// `transform` is used to normalize values before parsing.
36///
37/// For example,
38///
39/// ```ignore
40/// config_namespace! {
41///    /// Amazing config
42///    pub struct MyConfig {
43///        /// Field 1 doc
44///        field1: String, transform = str::to_lowercase, default = "".to_string()
45///
46///        /// Field 2 doc
47///        field2: usize, default = 232
48///
49///        /// Field 3 doc
50///        field3: Option<usize>, default = None
51///    }
52///}
53/// ```
54///
55/// Will generate
56///
57/// ```ignore
58/// /// Amazing config
59/// #[derive(Debug, Clone)]
60/// #[non_exhaustive]
61/// pub struct MyConfig {
62///     /// Field 1 doc
63///     field1: String,
64///     /// Field 2 doc
65///     field2: usize,
66///     /// Field 3 doc
67///     field3: Option<usize>,
68/// }
69/// impl ConfigField for MyConfig {
70///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
71///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
72///         match key {
73///             "field1" => {
74///                 let value = str::to_lowercase(value);
75///                 self.field1.set(rem, value.as_ref())
76///             },
77///             "field2" => self.field2.set(rem, value.as_ref()),
78///             "field3" => self.field3.set(rem, value.as_ref()),
79///             _ => _internal_err!(
80///                 "Config value \"{}\" not found on MyConfig",
81///                 key
82///             ),
83///         }
84///     }
85///
86///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
87///         let key = format!("{}.field1", key_prefix);
88///         let desc = "Field 1 doc";
89///         self.field1.visit(v, key.as_str(), desc);
90///         let key = format!("{}.field2", key_prefix);
91///         let desc = "Field 2 doc";
92///         self.field2.visit(v, key.as_str(), desc);
93///         let key = format!("{}.field3", key_prefix);
94///         let desc = "Field 3 doc";
95///         self.field3.visit(v, key.as_str(), desc);
96///     }
97/// }
98///
99/// impl Default for MyConfig {
100///     fn default() -> Self {
101///         Self {
102///             field1: "".to_string(),
103///             field2: 232,
104///             field3: None,
105///         }
106///     }
107/// }
108/// ```
109///
110/// NB: Misplaced commas may result in nonsensical errors
111#[macro_export]
112macro_rules! config_namespace {
113    (
114        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
115        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
116        $(#[allow($($struct_de:tt)*)])?
117        $vis:vis struct $struct_name:ident {
118            $(
119                $(#[doc = $d:tt])* // Field-level documentation attributes
120                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
121                $(#[allow($($field_de:tt)*)])?
122                $field_vis:vis $field_name:ident : $field_type:ty,
123                $(warn = $warn:expr,)?
124                $(transform = $transform:expr,)?
125                default = $default:expr
126            )*$(,)*
127        }
128    ) => {
129        $(#[doc = $struct_d])* // Apply struct documentation
130        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
131        $(#[allow($($struct_de)*)])?
132        #[derive(Debug, Clone, PartialEq)]
133        $vis struct $struct_name {
134            $(
135                $(#[doc = $d])* // Apply field documentation
136                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
137                $(#[allow($($field_de)*)])?
138                $field_vis $field_name: $field_type,
139            )*
140        }
141
142        impl $crate::config::ConfigField for $struct_name {
143            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
144                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
145                match key {
146                    $(
147                        stringify!($field_name) => {
148                            // Safely apply deprecated attribute if present
149                            // $(#[allow(deprecated)])?
150                            {
151                                $(let value = $transform(value);)? // Apply transformation if specified
152                                $(log::warn!($warn);)? // Log warning if specified
153                                #[allow(deprecated)]
154                                self.$field_name.set(rem, value.as_ref())
155                            }
156                        },
157                    )*
158                    _ => return $crate::error::_config_err!(
159                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
160                    )
161                }
162            }
163
164            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
165                $(
166                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
167                    let desc = concat!($($d),*).trim();
168                    #[allow(deprecated)]
169                    self.$field_name.visit(v, key.as_str(), desc);
170                )*
171            }
172        }
173        impl Default for $struct_name {
174            fn default() -> Self {
175                #[allow(deprecated)]
176                Self {
177                    $($field_name: $default),*
178                }
179            }
180        }
181    }
182}
183
184config_namespace! {
185    /// Options related to catalog and directory scanning
186    ///
187    /// See also: [`SessionConfig`]
188    ///
189    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
190    pub struct CatalogOptions {
191        /// Whether the default catalog and schema should be created automatically.
192        pub create_default_catalog_and_schema: bool, default = true
193
194        /// The default catalog name - this impacts what SQL queries use if not specified
195        pub default_catalog: String, default = "datafusion".to_string()
196
197        /// The default schema name - this impacts what SQL queries use if not specified
198        pub default_schema: String, default = "public".to_string()
199
200        /// Should DataFusion provide access to `information_schema`
201        /// virtual tables for displaying schema information
202        pub information_schema: bool, default = false
203
204        /// Location scanned to load tables for `default` schema
205        pub location: Option<String>, default = None
206
207        /// Type of `TableProvider` to use when loading `default` schema
208        pub format: Option<String>, default = None
209
210        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
211        /// if not specified explicitly in the statement.
212        pub has_header: bool, default = true
213
214        /// Specifies whether newlines in (quoted) CSV values are supported.
215        ///
216        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
217        /// if not specified explicitly in the statement.
218        ///
219        /// Parsing newlines in quoted values may be affected by execution behaviour such as
220        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
221        /// parsed successfully, which may reduce performance.
222        pub newlines_in_values: bool, default = false
223    }
224}
225
226config_namespace! {
227    /// Options related to SQL parser
228    ///
229    /// See also: [`SessionConfig`]
230    ///
231    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
232    pub struct SqlParserOptions {
233        /// When set to true, SQL parser will parse float as decimal type
234        pub parse_float_as_decimal: bool, default = false
235
236        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
237        pub enable_ident_normalization: bool, default = true
238
239        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
240        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
241        /// are normalized automatically.
242        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
243
244        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
245        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
246        pub dialect: String, default = "generic".to_string()
247        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
248
249        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
250        /// ignore the length. If false, error if a `VARCHAR` with a length is
251        /// specified. The Arrow type system does not have a notion of maximum
252        /// string length and thus DataFusion can not enforce such limits.
253        pub support_varchar_with_length: bool, default = true
254
255        /// When set to true, the source locations relative to the original SQL
256        /// query (i.e. [`Span`](sqlparser::tokenizer::Span)) will be collected
257        /// and recorded in the logical plan nodes.
258        pub collect_spans: bool, default = false
259
260        /// Specifies the recursion depth limit when parsing complex SQL Queries
261        pub recursion_limit: usize, default = 50
262    }
263}
264
265config_namespace! {
266    /// Options related to query execution
267    ///
268    /// See also: [`SessionConfig`]
269    ///
270    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
271    pub struct ExecutionOptions {
272        /// Default batch size while creating new batches, it's especially useful for
273        /// buffer-in-memory batches since creating tiny batches would result in too much
274        /// metadata memory consumption
275        pub batch_size: usize, default = 8192
276
277        /// When set to true, record batches will be examined between each operator and
278        /// small batches will be coalesced into larger batches. This is helpful when there
279        /// are highly selective filters or joins that could produce tiny output batches. The
280        /// target batch size is determined by the configuration setting
281        pub coalesce_batches: bool, default = true
282
283        /// Should DataFusion collect statistics after listing files
284        pub collect_statistics: bool, default = false
285
286        /// Number of partitions for query execution. Increasing partitions can increase
287        /// concurrency.
288        ///
289        /// Defaults to the number of CPU cores on the system
290        pub target_partitions: usize, default = get_available_parallelism()
291
292        /// The default time zone
293        ///
294        /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
295        /// according to this time zone, and then extract the hour
296        pub time_zone: Option<String>, default = Some("+00:00".into())
297
298        /// Parquet options
299        pub parquet: ParquetOptions, default = Default::default()
300
301        /// Fan-out during initial physical planning.
302        ///
303        /// This is mostly use to plan `UNION` children in parallel.
304        ///
305        /// Defaults to the number of CPU cores on the system
306        pub planning_concurrency: usize, default = get_available_parallelism()
307
308        /// When set to true, skips verifying that the schema produced by
309        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
310        /// schema of the input plan.
311        ///
312        /// When set to false, if the schema does not match exactly
313        /// (including nullability and metadata), a planning error will be raised.
314        ///
315        /// This is used to workaround bugs in the planner that are now caught by
316        /// the new schema verification step.
317        pub skip_physical_aggregate_schema_check: bool, default = false
318
319        /// Specifies the reserved memory for each spillable sort operation to
320        /// facilitate an in-memory merge.
321        ///
322        /// When a sort operation spills to disk, the in-memory data must be
323        /// sorted and merged before being written to a file. This setting reserves
324        /// a specific amount of memory for that in-memory sort/merge process.
325        ///
326        /// Note: This setting is irrelevant if the sort operation cannot spill
327        /// (i.e., if there's no `DiskManager` configured).
328        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
329
330        /// When sorting, below what size should data be concatenated
331        /// and sorted in a single RecordBatch rather than sorted in
332        /// batches and merged.
333        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
334
335        /// Number of files to read in parallel when inferring schema and statistics
336        pub meta_fetch_concurrency: usize, default = 32
337
338        /// Guarantees a minimum level of output files running in parallel.
339        /// RecordBatches will be distributed in round robin fashion to each
340        /// parallel writer. Each writer is closed and a new file opened once
341        /// soft_max_rows_per_output_file is reached.
342        pub minimum_parallel_output_files: usize, default = 4
343
344        /// Target number of rows in output files when writing multiple.
345        /// This is a soft max, so it can be exceeded slightly. There also
346        /// will be one file smaller than the limit if the total
347        /// number of rows written is not roughly divisible by the soft max
348        pub soft_max_rows_per_output_file: usize, default = 50000000
349
350        /// This is the maximum number of RecordBatches buffered
351        /// for each output file being worked. Higher values can potentially
352        /// give faster write performance at the cost of higher peak
353        /// memory consumption
354        pub max_buffered_batches_per_output_file: usize, default = 2
355
356        /// Should sub directories be ignored when scanning directories for data
357        /// files. Defaults to true (ignores subdirectories), consistent with
358        /// Hive. Note that this setting does not affect reading partitioned
359        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
360        pub listing_table_ignore_subdirectory: bool, default = true
361
362        /// Should DataFusion support recursive CTEs
363        pub enable_recursive_ctes: bool, default = true
364
365        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
366        /// statistics into the same file groups.
367        /// Currently experimental
368        pub split_file_groups_by_statistics: bool, default = false
369
370        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
371        pub keep_partition_by_columns: bool, default = false
372
373        /// Aggregation ratio (number of distinct groups / number of input rows)
374        /// threshold for skipping partial aggregation. If the value is greater
375        /// then partial aggregation will skip aggregation for further input
376        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
377
378        /// Number of input rows partial aggregation partition should process, before
379        /// aggregation ratio check and trying to switch to skipping aggregation mode
380        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
381
382        /// Should DataFusion use row number estimates at the input to decide
383        /// whether increasing parallelism is beneficial or not. By default,
384        /// only exact row numbers (not estimates) are used for this decision.
385        /// Setting this flag to `true` will likely produce better plans.
386        /// if the source of statistics is accurate.
387        /// We plan to make this the default in the future.
388        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
389
390        /// Should DataFusion enforce batch size in joins or not. By default,
391        /// DataFusion will not enforce batch size in joins. Enforcing batch size
392        /// in joins can reduce memory usage when joining large
393        /// tables with a highly-selective join filter, but is also slightly slower.
394        pub enforce_batch_size_in_joins: bool, default = false
395    }
396}
397
398config_namespace! {
399    /// Options for reading and writing parquet files
400    ///
401    /// See also: [`SessionConfig`]
402    ///
403    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
404    pub struct ParquetOptions {
405        // The following options affect reading parquet files
406
407        /// (reading) If true, reads the Parquet data page level metadata (the
408        /// Page Index), if present, to reduce the I/O and number of
409        /// rows decoded.
410        pub enable_page_index: bool, default = true
411
412        /// (reading) If true, the parquet reader attempts to skip entire row groups based
413        /// on the predicate in the query and the metadata (min/max values) stored in
414        /// the parquet file
415        pub pruning: bool, default = true
416
417        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
418        /// the file Schema. This setting can help avoid schema conflicts when querying
419        /// multiple parquet files with schemas containing compatible types but different metadata
420        pub skip_metadata: bool, default = true
421
422        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
423        /// bytes of the parquet file optimistically. If not specified, two reads are required:
424        /// One read to fetch the 8-byte parquet footer and
425        /// another to fetch the metadata length encoded in the footer
426        pub metadata_size_hint: Option<usize>, default = None
427
428        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
429        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
430        pub pushdown_filters: bool, default = false
431
432        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
433        /// will be reordered heuristically to minimize the cost of evaluation. If false,
434        /// the filters are applied in the same order as written in the query
435        pub reorder_filters: bool, default = false
436
437        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
438        /// and `Binary/BinaryLarge` with `BinaryView`.
439        pub schema_force_view_types: bool, default = true
440
441        /// (reading) If true, parquet reader will read columns of
442        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
443        ///
444        /// Parquet files generated by some legacy writers do not correctly set
445        /// the UTF8 flag for strings, causing string columns to be loaded as
446        /// BLOB instead.
447        pub binary_as_string: bool, default = false
448
449        // The following options affect writing to parquet files
450        // and map to parquet::file::properties::WriterProperties
451
452        /// (writing) Sets best effort maximum size of data page in bytes
453        pub data_pagesize_limit: usize, default = 1024 * 1024
454
455        /// (writing) Sets write_batch_size in bytes
456        pub write_batch_size: usize, default = 1024
457
458        /// (writing) Sets parquet writer version
459        /// valid values are "1.0" and "2.0"
460        pub writer_version: String, default = "1.0".to_string()
461
462        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
463        ///
464        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
465        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
466        pub skip_arrow_metadata: bool, default = false
467
468        /// (writing) Sets default parquet compression codec.
469        /// Valid values are: uncompressed, snappy, gzip(level),
470        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
471        /// These values are not case sensitive. If NULL, uses
472        /// default parquet writer setting
473        ///
474        /// Note that this default setting is not the same as
475        /// the default parquet writer setting.
476        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
477
478        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
479        /// default parquet writer setting
480        pub dictionary_enabled: Option<bool>, default = Some(true)
481
482        /// (writing) Sets best effort maximum dictionary page size, in bytes
483        pub dictionary_page_size_limit: usize, default = 1024 * 1024
484
485        /// (writing) Sets if statistics are enabled for any column
486        /// Valid values are: "none", "chunk", and "page"
487        /// These values are not case sensitive. If NULL, uses
488        /// default parquet writer setting
489        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
490
491        /// (writing) Sets max statistics size for any column. If NULL, uses
492        /// default parquet writer setting
493        /// max_statistics_size is deprecated, currently it is not being used
494        // TODO: remove once deprecated
495        #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
496        pub max_statistics_size: Option<usize>, default = Some(4096)
497
498        /// (writing) Target maximum number of rows in each row group (defaults to 1M
499        /// rows). Writing larger row groups requires more memory to write, but
500        /// can get better compression and be faster to read.
501        pub max_row_group_size: usize, default =  1024 * 1024
502
503        /// (writing) Sets "created by" property
504        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
505
506        /// (writing) Sets column index truncate length
507        pub column_index_truncate_length: Option<usize>, default = Some(64)
508
509        /// (writing) Sets statictics truncate length. If NULL, uses
510        /// default parquet writer setting
511        pub statistics_truncate_length: Option<usize>, default = None
512
513        /// (writing) Sets best effort maximum number of rows in data page
514        pub data_page_row_count_limit: usize, default = 20_000
515
516        /// (writing)  Sets default encoding for any column.
517        /// Valid values are: plain, plain_dictionary, rle,
518        /// bit_packed, delta_binary_packed, delta_length_byte_array,
519        /// delta_byte_array, rle_dictionary, and byte_stream_split.
520        /// These values are not case sensitive. If NULL, uses
521        /// default parquet writer setting
522        pub encoding: Option<String>, transform = str::to_lowercase, default = None
523
524        /// (writing) Use any available bloom filters when reading parquet files
525        pub bloom_filter_on_read: bool, default = true
526
527        /// (writing) Write bloom filters for all columns when creating parquet files
528        pub bloom_filter_on_write: bool, default = false
529
530        /// (writing) Sets bloom filter false positive probability. If NULL, uses
531        /// default parquet writer setting
532        pub bloom_filter_fpp: Option<f64>, default = None
533
534        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
535        /// default parquet writer setting
536        pub bloom_filter_ndv: Option<u64>, default = None
537
538        /// (writing) Controls whether DataFusion will attempt to speed up writing
539        /// parquet files by serializing them in parallel. Each column
540        /// in each row group in each output file are serialized in parallel
541        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
542        pub allow_single_file_parallelism: bool, default = true
543
544        /// (writing) By default parallel parquet writer is tuned for minimum
545        /// memory usage in a streaming execution plan. You may see
546        /// a performance benefit when writing large parquet files
547        /// by increasing maximum_parallel_row_group_writers and
548        /// maximum_buffered_record_batches_per_stream if your system
549        /// has idle cores and can tolerate additional memory usage.
550        /// Boosting these values is likely worthwhile when
551        /// writing out already in-memory data, such as from a cached
552        /// data frame.
553        pub maximum_parallel_row_group_writers: usize, default = 1
554
555        /// (writing) By default parallel parquet writer is tuned for minimum
556        /// memory usage in a streaming execution plan. You may see
557        /// a performance benefit when writing large parquet files
558        /// by increasing maximum_parallel_row_group_writers and
559        /// maximum_buffered_record_batches_per_stream if your system
560        /// has idle cores and can tolerate additional memory usage.
561        /// Boosting these values is likely worthwhile when
562        /// writing out already in-memory data, such as from a cached
563        /// data frame.
564        pub maximum_buffered_record_batches_per_stream: usize, default = 2
565    }
566}
567
568config_namespace! {
569    /// Options related to query optimization
570    ///
571    /// See also: [`SessionConfig`]
572    ///
573    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
574    pub struct OptimizerOptions {
575        /// When set to true, the optimizer will push a limit operation into
576        /// grouped aggregations which have no aggregate expressions, as a soft limit,
577        /// emitting groups once the limit is reached, before all rows in the group are read.
578        pub enable_distinct_aggregation_soft_limit: bool, default = true
579
580        /// When set to true, the physical plan optimizer will try to add round robin
581        /// repartitioning to increase parallelism to leverage more CPU cores
582        pub enable_round_robin_repartition: bool, default = true
583
584        /// When set to true, the optimizer will attempt to perform limit operations
585        /// during aggregations, if possible
586        pub enable_topk_aggregation: bool, default = true
587
588        /// When set to true, the optimizer will insert filters before a join between
589        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
590        /// filter can add additional overhead when the file format does not fully support
591        /// predicate push down.
592        pub filter_null_join_keys: bool, default = false
593
594        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
595        /// in parallel using the provided `target_partitions` level
596        pub repartition_aggregations: bool, default = true
597
598        /// Minimum total files size in bytes to perform file scan repartitioning.
599        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
600
601        /// Should DataFusion repartition data using the join keys to execute joins in parallel
602        /// using the provided `target_partitions` level
603        pub repartition_joins: bool, default = true
604
605        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
606        /// its inputs do not have any ordering or filtering If the flag is not enabled,
607        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
608        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
609        /// RightAnti, and RightSemi - being produced only at the end of the execution.
610        /// This is not typical in stream processing. Additionally, without proper design for
611        /// long runner execution, all types of joins may encounter out-of-memory errors.
612        pub allow_symmetric_joins_without_pruning: bool, default = true
613
614        /// When set to `true`, file groups will be repartitioned to achieve maximum parallelism.
615        /// Currently Parquet and CSV formats are supported.
616        ///
617        /// If set to `true`, all files will be repartitioned evenly (i.e., a single large file
618        /// might be partitioned into smaller chunks) for parallel scanning.
619        /// If set to `false`, different files will be read in parallel, but repartitioning won't
620        /// happen within a single file.
621        pub repartition_file_scans: bool, default = true
622
623        /// Should DataFusion repartition data using the partitions keys to execute window
624        /// functions in parallel using the provided `target_partitions` level
625        pub repartition_windows: bool, default = true
626
627        /// Should DataFusion execute sorts in a per-partition fashion and merge
628        /// afterwards instead of coalescing first and sorting globally.
629        /// With this flag is enabled, plans in the form below
630        ///
631        /// ```text
632        ///      "SortExec: [a@0 ASC]",
633        ///      "  CoalescePartitionsExec",
634        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
635        /// ```
636        /// would turn into the plan below which performs better in multithreaded environments
637        ///
638        /// ```text
639        ///      "SortPreservingMergeExec: [a@0 ASC]",
640        ///      "  SortExec: [a@0 ASC]",
641        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
642        /// ```
643        pub repartition_sorts: bool, default = true
644
645        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
646        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
647        /// using `SortPreservingMergeExec`)
648        ///
649        /// When false, DataFusion will maximize plan parallelism using
650        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
651        pub prefer_existing_sort: bool, default = false
652
653        /// When set to true, the logical plan optimizer will produce warning
654        /// messages if any optimization rules produce errors and then proceed to the next
655        /// rule. When set to false, any rules that produce errors will cause the query to fail
656        pub skip_failed_rules: bool, default = false
657
658        /// Number of times that the optimizer will attempt to optimize the plan
659        pub max_passes: usize, default = 3
660
661        /// When set to true, the physical plan optimizer will run a top down
662        /// process to reorder the join keys
663        pub top_down_join_key_reordering: bool, default = true
664
665        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
666        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
667        pub prefer_hash_join: bool, default = true
668
669        /// The maximum estimated size in bytes for one input side of a HashJoin
670        /// will be collected into a single partition
671        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
672
673        /// The maximum estimated size in rows for one input side of a HashJoin
674        /// will be collected into a single partition
675        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
676
677        /// The default filter selectivity used by Filter Statistics
678        /// when an exact selectivity cannot be determined. Valid values are
679        /// between 0 (no selectivity) and 100 (all rows are selected).
680        pub default_filter_selectivity: u8, default = 20
681
682        /// When set to true, the optimizer will not attempt to convert Union to Interleave
683        pub prefer_existing_union: bool, default = false
684
685        /// When set to true, if the returned type is a view type
686        /// then the output will be coerced to a non-view.
687        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
688        pub expand_views_at_output: bool, default = false
689    }
690}
691
692config_namespace! {
693    /// Options controlling explain output
694    ///
695    /// See also: [`SessionConfig`]
696    ///
697    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
698    pub struct ExplainOptions {
699        /// When set to true, the explain statement will only print logical plans
700        pub logical_plan_only: bool, default = false
701
702        /// When set to true, the explain statement will only print physical plans
703        pub physical_plan_only: bool, default = false
704
705        /// When set to true, the explain statement will print operator statistics
706        /// for physical plans
707        pub show_statistics: bool, default = false
708
709        /// When set to true, the explain statement will print the partition sizes
710        pub show_sizes: bool, default = true
711
712        /// When set to true, the explain statement will print schema information
713        pub show_schema: bool, default = false
714    }
715}
716
717/// A key value pair, with a corresponding description
718#[derive(Debug)]
719pub struct ConfigEntry {
720    /// A unique string to identify this config value
721    pub key: String,
722
723    /// The value if any
724    pub value: Option<String>,
725
726    /// A description of this configuration entry
727    pub description: &'static str,
728}
729
730/// Configuration options struct, able to store both built-in configuration and custom options
731#[derive(Debug, Clone, Default)]
732#[non_exhaustive]
733pub struct ConfigOptions {
734    /// Catalog options
735    pub catalog: CatalogOptions,
736    /// Execution options
737    pub execution: ExecutionOptions,
738    /// Optimizer options
739    pub optimizer: OptimizerOptions,
740    /// SQL parser options
741    pub sql_parser: SqlParserOptions,
742    /// Explain options
743    pub explain: ExplainOptions,
744    /// Optional extensions registered using [`Extensions::insert`]
745    pub extensions: Extensions,
746}
747
748impl ConfigField for ConfigOptions {
749    fn set(&mut self, key: &str, value: &str) -> Result<()> {
750        // Extensions are handled in the public `ConfigOptions::set`
751        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
752        match key {
753            "catalog" => self.catalog.set(rem, value),
754            "execution" => self.execution.set(rem, value),
755            "optimizer" => self.optimizer.set(rem, value),
756            "explain" => self.explain.set(rem, value),
757            "sql_parser" => self.sql_parser.set(rem, value),
758            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
759        }
760    }
761
762    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
763        self.catalog.visit(v, "datafusion.catalog", "");
764        self.execution.visit(v, "datafusion.execution", "");
765        self.optimizer.visit(v, "datafusion.optimizer", "");
766        self.explain.visit(v, "datafusion.explain", "");
767        self.sql_parser.visit(v, "datafusion.sql_parser", "");
768    }
769}
770
771impl ConfigOptions {
772    /// Creates a new [`ConfigOptions`] with default values
773    pub fn new() -> Self {
774        Self::default()
775    }
776
777    /// Set extensions to provided value
778    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
779        self.extensions = extensions;
780        self
781    }
782
783    /// Set a configuration option
784    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
785        let Some((prefix, key)) = key.split_once('.') else {
786            return _config_err!("could not find config namespace for key \"{key}\"");
787        };
788
789        if prefix == "datafusion" {
790            return ConfigField::set(self, key, value);
791        }
792
793        let Some(e) = self.extensions.0.get_mut(prefix) else {
794            return _config_err!("Could not find config namespace \"{prefix}\"");
795        };
796        e.0.set(key, value)
797    }
798
799    /// Create new ConfigOptions struct, taking values from
800    /// environment variables where possible.
801    ///
802    /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
803    /// control `datafusion.execution.batch_size`.
804    pub fn from_env() -> Result<Self> {
805        struct Visitor(Vec<String>);
806
807        impl Visit for Visitor {
808            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
809                self.0.push(key.to_string())
810            }
811
812            fn none(&mut self, key: &str, _: &'static str) {
813                self.0.push(key.to_string())
814            }
815        }
816
817        // Extract the names of all fields and then look up the corresponding
818        // environment variables. This isn't hugely efficient but avoids
819        // ambiguity between `a.b` and `a_b` which would both correspond
820        // to an environment variable of `A_B`
821
822        let mut keys = Visitor(vec![]);
823        let mut ret = Self::default();
824        ret.visit(&mut keys, "datafusion", "");
825
826        for key in keys.0 {
827            let env = key.to_uppercase().replace('.', "_");
828            if let Some(var) = std::env::var_os(env) {
829                ret.set(&key, var.to_string_lossy().as_ref())?;
830            }
831        }
832
833        Ok(ret)
834    }
835
836    /// Create new ConfigOptions struct, taking values from a string hash map.
837    ///
838    /// Only the built-in configurations will be extracted from the hash map
839    /// and other key value pairs will be ignored.
840    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
841        struct Visitor(Vec<String>);
842
843        impl Visit for Visitor {
844            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
845                self.0.push(key.to_string())
846            }
847
848            fn none(&mut self, key: &str, _: &'static str) {
849                self.0.push(key.to_string())
850            }
851        }
852
853        let mut keys = Visitor(vec![]);
854        let mut ret = Self::default();
855        ret.visit(&mut keys, "datafusion", "");
856
857        for key in keys.0 {
858            if let Some(var) = settings.get(&key) {
859                ret.set(&key, var)?;
860            }
861        }
862
863        Ok(ret)
864    }
865
866    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
867    pub fn entries(&self) -> Vec<ConfigEntry> {
868        struct Visitor(Vec<ConfigEntry>);
869
870        impl Visit for Visitor {
871            fn some<V: Display>(
872                &mut self,
873                key: &str,
874                value: V,
875                description: &'static str,
876            ) {
877                self.0.push(ConfigEntry {
878                    key: key.to_string(),
879                    value: Some(value.to_string()),
880                    description,
881                })
882            }
883
884            fn none(&mut self, key: &str, description: &'static str) {
885                self.0.push(ConfigEntry {
886                    key: key.to_string(),
887                    value: None,
888                    description,
889                })
890            }
891        }
892
893        let mut v = Visitor(vec![]);
894        self.visit(&mut v, "datafusion", "");
895
896        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
897        v.0
898    }
899
900    /// Generate documentation that can be included in the user guide
901    pub fn generate_config_markdown() -> String {
902        use std::fmt::Write as _;
903
904        let mut s = Self::default();
905
906        // Normalize for display
907        s.execution.target_partitions = 0;
908        s.execution.planning_concurrency = 0;
909
910        let mut docs = "| key | default | description |\n".to_string();
911        docs += "|-----|---------|-------------|\n";
912        let mut entries = s.entries();
913        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
914
915        for entry in s.entries() {
916            let _ = writeln!(
917                &mut docs,
918                "| {} | {} | {} |",
919                entry.key,
920                entry.value.as_deref().unwrap_or("NULL"),
921                entry.description
922            );
923        }
924        docs
925    }
926}
927
928/// [`ConfigExtension`] provides a mechanism to store third-party configuration
929/// within DataFusion [`ConfigOptions`]
930///
931/// This mechanism can be used to pass configuration to user defined functions
932/// or optimizer passes
933///
934/// # Example
935/// ```
936/// use datafusion_common::{
937///     config::ConfigExtension, extensions_options,
938///     config::ConfigOptions,
939/// };
940///  // Define a new configuration struct using the `extensions_options` macro
941///  extensions_options! {
942///     /// My own config options.
943///     pub struct MyConfig {
944///         /// Should "foo" be replaced by "bar"?
945///         pub foo_to_bar: bool, default = true
946///
947///         /// How many "baz" should be created?
948///         pub baz_count: usize, default = 1337
949///     }
950///  }
951///
952///  impl ConfigExtension for MyConfig {
953///     const PREFIX: &'static str = "my_config";
954///  }
955///
956///  // set up config struct and register extension
957///  let mut config = ConfigOptions::default();
958///  config.extensions.insert(MyConfig::default());
959///
960///  // overwrite config default
961///  config.set("my_config.baz_count", "42").unwrap();
962///
963///  // check config state
964///  let my_config = config.extensions.get::<MyConfig>().unwrap();
965///  assert!(my_config.foo_to_bar,);
966///  assert_eq!(my_config.baz_count, 42,);
967/// ```
968///
969/// # Note:
970/// Unfortunately associated constants are not currently object-safe, and so this
971/// extends the object-safe [`ExtensionOptions`]
972pub trait ConfigExtension: ExtensionOptions {
973    /// Configuration namespace prefix to use
974    ///
975    /// All values under this will be prefixed with `$PREFIX + "."`
976    const PREFIX: &'static str;
977}
978
979/// An object-safe API for storing arbitrary configuration.
980///
981/// See [`ConfigExtension`] for user defined configuration
982pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
983    /// Return `self` as [`Any`]
984    ///
985    /// This is needed until trait upcasting is stabilized
986    fn as_any(&self) -> &dyn Any;
987
988    /// Return `self` as [`Any`]
989    ///
990    /// This is needed until trait upcasting is stabilized
991    fn as_any_mut(&mut self) -> &mut dyn Any;
992
993    /// Return a deep clone of this [`ExtensionOptions`]
994    ///
995    /// It is important this does not share mutable state to avoid consistency issues
996    /// with configuration changing whilst queries are executing
997    fn cloned(&self) -> Box<dyn ExtensionOptions>;
998
999    /// Set the given `key`, `value` pair
1000    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1001
1002    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1003    fn entries(&self) -> Vec<ConfigEntry>;
1004}
1005
1006/// A type-safe container for [`ConfigExtension`]
1007#[derive(Debug, Default, Clone)]
1008pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1009
1010impl Extensions {
1011    /// Create a new, empty [`Extensions`]
1012    pub fn new() -> Self {
1013        Self(BTreeMap::new())
1014    }
1015
1016    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1017    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1018        assert_ne!(T::PREFIX, "datafusion");
1019        let e = ExtensionBox(Box::new(extension));
1020        self.0.insert(T::PREFIX, e);
1021    }
1022
1023    /// Retrieves the extension of the given type if any
1024    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1025        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1026    }
1027
1028    /// Retrieves the extension of the given type if any
1029    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1030        let e = self.0.get_mut(T::PREFIX)?;
1031        e.0.as_any_mut().downcast_mut()
1032    }
1033}
1034
1035#[derive(Debug)]
1036struct ExtensionBox(Box<dyn ExtensionOptions>);
1037
1038impl Clone for ExtensionBox {
1039    fn clone(&self) -> Self {
1040        Self(self.0.cloned())
1041    }
1042}
1043
1044/// A trait implemented by `config_namespace` and for field types that provides
1045/// the ability to walk and mutate the configuration tree
1046pub trait ConfigField {
1047    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1048
1049    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1050}
1051
1052impl<F: ConfigField + Default> ConfigField for Option<F> {
1053    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1054        match self {
1055            Some(s) => s.visit(v, key, description),
1056            None => v.none(key, description),
1057        }
1058    }
1059
1060    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1061        self.get_or_insert_with(Default::default).set(key, value)
1062    }
1063}
1064
1065fn default_transform<T>(input: &str) -> Result<T>
1066where
1067    T: FromStr,
1068    <T as FromStr>::Err: Sync + Send + Error + 'static,
1069{
1070    input.parse().map_err(|e| {
1071        DataFusionError::Context(
1072            format!(
1073                "Error parsing '{}' as {}",
1074                input,
1075                std::any::type_name::<T>()
1076            ),
1077            Box::new(DataFusionError::External(Box::new(e))),
1078        )
1079    })
1080}
1081
1082#[macro_export]
1083macro_rules! config_field {
1084    ($t:ty) => {
1085        config_field!($t, value => default_transform(value)?);
1086    };
1087
1088    ($t:ty, $arg:ident => $transform:expr) => {
1089        impl ConfigField for $t {
1090            fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1091                v.some(key, self, description)
1092            }
1093
1094            fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
1095                *self = $transform;
1096                Ok(())
1097            }
1098        }
1099    };
1100}
1101
1102config_field!(String);
1103config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
1104config_field!(usize);
1105config_field!(f64);
1106config_field!(u64);
1107
1108impl ConfigField for u8 {
1109    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1110        v.some(key, self, description)
1111    }
1112
1113    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1114        if value.is_empty() {
1115            return Err(DataFusionError::Configuration(format!(
1116                "Input string for {} key is empty",
1117                key
1118            )));
1119        }
1120        // Check if the string is a valid number
1121        if let Ok(num) = value.parse::<u8>() {
1122            // TODO: Let's decide how we treat the numerical strings.
1123            *self = num;
1124        } else {
1125            let bytes = value.as_bytes();
1126            // Check if the first character is ASCII (single byte)
1127            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1128                return Err(DataFusionError::Configuration(format!(
1129                    "Error parsing {} as u8. Non-ASCII string provided",
1130                    value
1131                )));
1132            }
1133            *self = bytes[0];
1134        }
1135        Ok(())
1136    }
1137}
1138
1139impl ConfigField for CompressionTypeVariant {
1140    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1141        v.some(key, self, description)
1142    }
1143
1144    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1145        *self = CompressionTypeVariant::from_str(value)?;
1146        Ok(())
1147    }
1148}
1149
1150/// An implementation trait used to recursively walk configuration
1151pub trait Visit {
1152    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1153
1154    fn none(&mut self, key: &str, description: &'static str);
1155}
1156
1157/// Convenience macro to create [`ExtensionsOptions`].
1158///
1159/// The created structure implements the following traits:
1160///
1161/// - [`Clone`]
1162/// - [`Debug`]
1163/// - [`Default`]
1164/// - [`ExtensionOptions`]
1165///
1166/// # Usage
1167/// The syntax is:
1168///
1169/// ```text
1170/// extensions_options! {
1171///      /// Struct docs (optional).
1172///     [<vis>] struct <StructName> {
1173///         /// Field docs (optional)
1174///         [<vis>] <field_name>: <field_type>, default = <default_value>
1175///
1176///         ... more fields
1177///     }
1178/// }
1179/// ```
1180///
1181/// The placeholders are:
1182/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1183/// - `<StructName>`: Struct name like `MyStruct`.
1184/// - `<field_name>`: Field name like `my_field`.
1185/// - `<field_type>`: Field type like `u8`.
1186/// - `<default_value>`: Default value matching the field type like `42`.
1187///
1188/// # Example
1189/// See also a full example on the [`ConfigExtension`] documentation
1190///
1191/// ```
1192/// use datafusion_common::extensions_options;
1193///
1194/// extensions_options! {
1195///     /// My own config options.
1196///     pub struct MyConfig {
1197///         /// Should "foo" be replaced by "bar"?
1198///         pub foo_to_bar: bool, default = true
1199///
1200///         /// How many "baz" should be created?
1201///         pub baz_count: usize, default = 1337
1202///     }
1203/// }
1204/// ```
1205///
1206///
1207/// [`Debug`]: std::fmt::Debug
1208/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1209#[macro_export]
1210macro_rules! extensions_options {
1211    (
1212     $(#[doc = $struct_d:tt])*
1213     $vis:vis struct $struct_name:ident {
1214        $(
1215        $(#[doc = $d:tt])*
1216        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1217        )*$(,)*
1218    }
1219    ) => {
1220        $(#[doc = $struct_d])*
1221        #[derive(Debug, Clone)]
1222        #[non_exhaustive]
1223        $vis struct $struct_name{
1224            $(
1225            $(#[doc = $d])*
1226            $field_vis $field_name : $field_type,
1227            )*
1228        }
1229
1230        impl Default for $struct_name {
1231            fn default() -> Self {
1232                Self {
1233                    $($field_name: $default),*
1234                }
1235            }
1236        }
1237
1238        impl $crate::config::ExtensionOptions for $struct_name {
1239            fn as_any(&self) -> &dyn ::std::any::Any {
1240                self
1241            }
1242
1243            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1244                self
1245            }
1246
1247            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1248                Box::new(self.clone())
1249            }
1250
1251            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1252                $crate::config::ConfigField::set(self, key, value)
1253            }
1254
1255            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1256                struct Visitor(Vec<$crate::config::ConfigEntry>);
1257
1258                impl $crate::config::Visit for Visitor {
1259                    fn some<V: std::fmt::Display>(
1260                        &mut self,
1261                        key: &str,
1262                        value: V,
1263                        description: &'static str,
1264                    ) {
1265                        self.0.push($crate::config::ConfigEntry {
1266                            key: key.to_string(),
1267                            value: Some(value.to_string()),
1268                            description,
1269                        })
1270                    }
1271
1272                    fn none(&mut self, key: &str, description: &'static str) {
1273                        self.0.push($crate::config::ConfigEntry {
1274                            key: key.to_string(),
1275                            value: None,
1276                            description,
1277                        })
1278                    }
1279                }
1280
1281                let mut v = Visitor(vec![]);
1282                // The prefix is not used for extensions.
1283                // The description is generated in ConfigField::visit.
1284                // We can just pass empty strings here.
1285                $crate::config::ConfigField::visit(self, &mut v, "", "");
1286                v.0
1287            }
1288        }
1289
1290        impl $crate::config::ConfigField for $struct_name {
1291            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1292                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1293                match key {
1294                    $(
1295                        stringify!($field_name) => {
1296                            // Safely apply deprecated attribute if present
1297                            // $(#[allow(deprecated)])?
1298                            {
1299                                #[allow(deprecated)]
1300                                self.$field_name.set(rem, value.as_ref())
1301                            }
1302                        },
1303                    )*
1304                    _ => return $crate::error::_config_err!(
1305                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1306                    )
1307                }
1308            }
1309
1310            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1311                $(
1312                    let key = stringify!($field_name).to_string();
1313                    let desc = concat!($($d),*).trim();
1314                    #[allow(deprecated)]
1315                    self.$field_name.visit(v, key.as_str(), desc);
1316                )*
1317            }
1318        }
1319    }
1320}
1321
1322/// These file types have special built in behavior for configuration.
1323/// Use TableOptions::Extensions for configuring other file types.
1324#[derive(Debug, Clone)]
1325pub enum ConfigFileType {
1326    CSV,
1327    #[cfg(feature = "parquet")]
1328    PARQUET,
1329    JSON,
1330}
1331
1332/// Represents the configuration options available for handling different table formats within a data processing application.
1333/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1334/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1335#[derive(Debug, Clone, Default)]
1336pub struct TableOptions {
1337    /// Configuration options for CSV file handling. This includes settings like the delimiter,
1338    /// quote character, and whether the first row is considered as headers.
1339    pub csv: CsvOptions,
1340
1341    /// Configuration options for Parquet file handling. This includes settings for compression,
1342    /// encoding, and other Parquet-specific file characteristics.
1343    pub parquet: TableParquetOptions,
1344
1345    /// Configuration options for JSON file handling.
1346    pub json: JsonOptions,
1347
1348    /// The current file format that the table operations should assume. This option allows
1349    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1350    pub current_format: Option<ConfigFileType>,
1351
1352    /// Optional extensions that can be used to extend or customize the behavior of the table
1353    /// options. Extensions can be registered using `Extensions::insert` and might include
1354    /// custom file handling logic, additional configuration parameters, or other enhancements.
1355    pub extensions: Extensions,
1356}
1357
1358impl ConfigField for TableOptions {
1359    /// Visits configuration settings for the current file format, or all formats if none is selected.
1360    ///
1361    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1362    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1363    /// it visits all available format settings.
1364    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1365        if let Some(file_type) = &self.current_format {
1366            match file_type {
1367                #[cfg(feature = "parquet")]
1368                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1369                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1370                ConfigFileType::JSON => self.json.visit(v, "format", ""),
1371            }
1372        } else {
1373            self.csv.visit(v, "csv", "");
1374            self.parquet.visit(v, "parquet", "");
1375            self.json.visit(v, "json", "");
1376        }
1377    }
1378
1379    /// Sets a configuration value for a specific key within `TableOptions`.
1380    ///
1381    /// This method delegates setting configuration values to the specific file format configurations,
1382    /// based on the current format selected. If no format is selected, it returns an error.
1383    ///
1384    /// # Parameters
1385    ///
1386    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1387    ///   for CSV format.
1388    /// * `value`: The value to set for the specified configuration key.
1389    ///
1390    /// # Returns
1391    ///
1392    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1393    /// or if setting the configuration value fails for the specific format.
1394    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1395        // Extensions are handled in the public `ConfigOptions::set`
1396        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1397        match key {
1398            "format" => {
1399                let Some(format) = &self.current_format else {
1400                    return _config_err!("Specify a format for TableOptions");
1401                };
1402                match format {
1403                    #[cfg(feature = "parquet")]
1404                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
1405                    ConfigFileType::CSV => self.csv.set(rem, value),
1406                    ConfigFileType::JSON => self.json.set(rem, value),
1407                }
1408            }
1409            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1410        }
1411    }
1412}
1413
1414impl TableOptions {
1415    /// Constructs a new instance of `TableOptions` with default settings.
1416    ///
1417    /// # Returns
1418    ///
1419    /// A new `TableOptions` instance with default configuration values.
1420    pub fn new() -> Self {
1421        Self::default()
1422    }
1423
1424    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1425    ///
1426    /// # Parameters
1427    ///
1428    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1429    ///
1430    /// # Returns
1431    ///
1432    /// A new `TableOptions` instance with settings applied from the session config.
1433    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1434        let initial = TableOptions::default();
1435        initial.combine_with_session_config(config)
1436    }
1437
1438    /// Updates the current `TableOptions` with settings from a given session config.
1439    ///
1440    /// # Parameters
1441    ///
1442    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1443    ///
1444    /// # Returns
1445    ///
1446    /// A new `TableOptions` instance with updated settings from the session config.
1447    #[must_use = "this method returns a new instance"]
1448    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1449        let mut clone = self.clone();
1450        clone.parquet.global = config.execution.parquet.clone();
1451        clone
1452    }
1453
1454    /// Sets the file format for the table.
1455    ///
1456    /// # Parameters
1457    ///
1458    /// * `format`: The file format to use (e.g., CSV, Parquet).
1459    pub fn set_config_format(&mut self, format: ConfigFileType) {
1460        self.current_format = Some(format);
1461    }
1462
1463    /// Sets the extensions for this `TableOptions` instance.
1464    ///
1465    /// # Parameters
1466    ///
1467    /// * `extensions`: The `Extensions` instance to set.
1468    ///
1469    /// # Returns
1470    ///
1471    /// A new `TableOptions` instance with the specified extensions applied.
1472    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1473        self.extensions = extensions;
1474        self
1475    }
1476
1477    /// Sets a specific configuration option.
1478    ///
1479    /// # Parameters
1480    ///
1481    /// * `key`: The configuration key (e.g., "format.delimiter").
1482    /// * `value`: The value to set for the specified key.
1483    ///
1484    /// # Returns
1485    ///
1486    /// A result indicating success or failure in setting the configuration option.
1487    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1488        let Some((prefix, _)) = key.split_once('.') else {
1489            return _config_err!("could not find config namespace for key \"{key}\"");
1490        };
1491
1492        if prefix == "format" {
1493            return ConfigField::set(self, key, value);
1494        }
1495
1496        if prefix == "execution" {
1497            return Ok(());
1498        }
1499
1500        let Some(e) = self.extensions.0.get_mut(prefix) else {
1501            return _config_err!("Could not find config namespace \"{prefix}\"");
1502        };
1503        e.0.set(key, value)
1504    }
1505
1506    /// Initializes a new `TableOptions` from a hash map of string settings.
1507    ///
1508    /// # Parameters
1509    ///
1510    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1511    ///
1512    /// # Returns
1513    ///
1514    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1515    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1516        let mut ret = Self::default();
1517        for (k, v) in settings {
1518            ret.set(k, v)?;
1519        }
1520
1521        Ok(ret)
1522    }
1523
1524    /// Modifies the current `TableOptions` instance with settings from a hash map.
1525    ///
1526    /// # Parameters
1527    ///
1528    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1529    ///
1530    /// # Returns
1531    ///
1532    /// A result indicating success or failure in applying the settings.
1533    pub fn alter_with_string_hash_map(
1534        &mut self,
1535        settings: &HashMap<String, String>,
1536    ) -> Result<()> {
1537        for (k, v) in settings {
1538            self.set(k, v)?;
1539        }
1540        Ok(())
1541    }
1542
1543    /// Retrieves all configuration entries from this `TableOptions`.
1544    ///
1545    /// # Returns
1546    ///
1547    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1548    pub fn entries(&self) -> Vec<ConfigEntry> {
1549        struct Visitor(Vec<ConfigEntry>);
1550
1551        impl Visit for Visitor {
1552            fn some<V: Display>(
1553                &mut self,
1554                key: &str,
1555                value: V,
1556                description: &'static str,
1557            ) {
1558                self.0.push(ConfigEntry {
1559                    key: key.to_string(),
1560                    value: Some(value.to_string()),
1561                    description,
1562                })
1563            }
1564
1565            fn none(&mut self, key: &str, description: &'static str) {
1566                self.0.push(ConfigEntry {
1567                    key: key.to_string(),
1568                    value: None,
1569                    description,
1570                })
1571            }
1572        }
1573
1574        let mut v = Visitor(vec![]);
1575        self.visit(&mut v, "format", "");
1576
1577        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1578        v.0
1579    }
1580}
1581
1582/// Options that control how Parquet files are read, including global options
1583/// that apply to all columns and optional column-specific overrides
1584///
1585/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1586/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1587/// (e.g. sorting_columns).
1588#[derive(Clone, Default, Debug, PartialEq)]
1589pub struct TableParquetOptions {
1590    /// Global Parquet options that propagates to all columns.
1591    pub global: ParquetOptions,
1592    /// Column specific options. Default usage is parquet.XX::column.
1593    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1594    /// Additional file-level metadata to include. Inserted into the key_value_metadata
1595    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1596    ///
1597    /// Multiple entries are permitted
1598    /// ```sql
1599    /// OPTIONS (
1600    ///    'format.metadata::key1' '',
1601    ///    'format.metadata::key2' 'value',
1602    ///    'format.metadata::key3' 'value has spaces',
1603    ///    'format.metadata::key4' 'value has special chars :: :',
1604    ///    'format.metadata::key_dupe' 'original will be overwritten',
1605    ///    'format.metadata::key_dupe' 'final'
1606    /// )
1607    /// ```
1608    pub key_value_metadata: HashMap<String, Option<String>>,
1609}
1610
1611impl TableParquetOptions {
1612    /// Return new default TableParquetOptions
1613    pub fn new() -> Self {
1614        Self::default()
1615    }
1616
1617    /// Set whether the encoding of the arrow metadata should occur
1618    /// during the writing of parquet.
1619    ///
1620    /// Default is to encode the arrow schema in the file kv_metadata.
1621    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1622        Self {
1623            global: ParquetOptions {
1624                skip_arrow_metadata: skip,
1625                ..self.global
1626            },
1627            ..self
1628        }
1629    }
1630}
1631
1632impl ConfigField for TableParquetOptions {
1633    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1634        self.global.visit(v, key_prefix, description);
1635        self.column_specific_options
1636            .visit(v, key_prefix, description)
1637    }
1638
1639    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1640        // Determine if the key is a global, metadata, or column-specific setting
1641        if key.starts_with("metadata::") {
1642            let k = match key.split("::").collect::<Vec<_>>()[..] {
1643                [_meta] | [_meta, ""] => {
1644                    return _config_err!(
1645                        "Invalid metadata key provided, missing key in metadata::<key>"
1646                    )
1647                }
1648                [_meta, k] => k.into(),
1649                _ => {
1650                    return _config_err!(
1651                        "Invalid metadata key provided, found too many '::' in \"{key}\""
1652                    )
1653                }
1654            };
1655            self.key_value_metadata.insert(k, Some(value.into()));
1656            Ok(())
1657        } else if key.contains("::") {
1658            self.column_specific_options.set(key, value)
1659        } else {
1660            self.global.set(key, value)
1661        }
1662    }
1663}
1664
1665macro_rules! config_namespace_with_hashmap {
1666    (
1667     $(#[doc = $struct_d:tt])*
1668     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
1669     $vis:vis struct $struct_name:ident {
1670        $(
1671        $(#[doc = $d:tt])*
1672        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
1673        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
1674        )*$(,)*
1675    }
1676    ) => {
1677
1678        $(#[doc = $struct_d])*
1679        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
1680        #[derive(Debug, Clone, PartialEq)]
1681        $vis struct $struct_name{
1682            $(
1683            $(#[doc = $d])*
1684            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
1685            $field_vis $field_name : $field_type,
1686            )*
1687        }
1688
1689        impl ConfigField for $struct_name {
1690            fn set(&mut self, key: &str, value: &str) -> Result<()> {
1691                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1692                match key {
1693                    $(
1694                       stringify!($field_name) => {
1695                           // Handle deprecated fields
1696                           #[allow(deprecated)] // Allow deprecated fields
1697                           $(let value = $transform(value);)?
1698                           self.$field_name.set(rem, value.as_ref())
1699                       },
1700                    )*
1701                    _ => _config_err!(
1702                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1703                    )
1704                }
1705            }
1706
1707            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1708                $(
1709                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
1710                let desc = concat!($($d),*).trim();
1711                // Handle deprecated fields
1712                #[allow(deprecated)]
1713                self.$field_name.visit(v, key.as_str(), desc);
1714                )*
1715            }
1716        }
1717
1718        impl Default for $struct_name {
1719            fn default() -> Self {
1720                #[allow(deprecated)]
1721                Self {
1722                    $($field_name: $default),*
1723                }
1724            }
1725        }
1726
1727        impl ConfigField for HashMap<String,$struct_name> {
1728            fn set(&mut self, key: &str, value: &str) -> Result<()> {
1729                let parts: Vec<&str> = key.splitn(2, "::").collect();
1730                match parts.as_slice() {
1731                    [inner_key, hashmap_key] => {
1732                        // Get or create the struct for the specified key
1733                        let inner_value = self
1734                            .entry((*hashmap_key).to_owned())
1735                            .or_insert_with($struct_name::default);
1736
1737                        inner_value.set(inner_key, value)
1738                    }
1739                    _ => _config_err!("Unrecognized key '{key}'."),
1740                }
1741            }
1742
1743            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1744                for (column_name, col_options) in self {
1745                    $(
1746                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
1747                    let desc = concat!($($d),*).trim();
1748                    #[allow(deprecated)]
1749                    col_options.$field_name.visit(v, key.as_str(), desc);
1750                    )*
1751                }
1752            }
1753        }
1754    }
1755}
1756
1757config_namespace_with_hashmap! {
1758    /// Options controlling parquet format for individual columns.
1759    ///
1760    /// See [`ParquetOptions`] for more details
1761    pub struct ParquetColumnOptions {
1762        /// Sets if bloom filter is enabled for the column path.
1763        pub bloom_filter_enabled: Option<bool>, default = None
1764
1765        /// Sets encoding for the column path.
1766        /// Valid values are: plain, plain_dictionary, rle,
1767        /// bit_packed, delta_binary_packed, delta_length_byte_array,
1768        /// delta_byte_array, rle_dictionary, and byte_stream_split.
1769        /// These values are not case-sensitive. If NULL, uses
1770        /// default parquet options
1771        pub encoding: Option<String>, default = None
1772
1773        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
1774        /// default parquet options
1775        pub dictionary_enabled: Option<bool>, default = None
1776
1777        /// Sets default parquet compression codec for the column path.
1778        /// Valid values are: uncompressed, snappy, gzip(level),
1779        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
1780        /// These values are not case-sensitive. If NULL, uses
1781        /// default parquet options
1782        pub compression: Option<String>, transform = str::to_lowercase, default = None
1783
1784        /// Sets if statistics are enabled for the column
1785        /// Valid values are: "none", "chunk", and "page"
1786        /// These values are not case sensitive. If NULL, uses
1787        /// default parquet options
1788        pub statistics_enabled: Option<String>, default = None
1789
1790        /// Sets bloom filter false positive probability for the column path. If NULL, uses
1791        /// default parquet options
1792        pub bloom_filter_fpp: Option<f64>, default = None
1793
1794        /// Sets bloom filter number of distinct values. If NULL, uses
1795        /// default parquet options
1796        pub bloom_filter_ndv: Option<u64>, default = None
1797
1798        /// Sets max statistics size for the column path. If NULL, uses
1799        /// default parquet options
1800        /// max_statistics_size is deprecated, currently it is not being used
1801        // TODO: remove once deprecated
1802        #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
1803        pub max_statistics_size: Option<usize>, default = None
1804    }
1805}
1806
1807config_namespace! {
1808    /// Options controlling CSV format
1809    pub struct CsvOptions {
1810        /// Specifies whether there is a CSV header (i.e. the first line
1811        /// consists of is column names). The value `None` indicates that
1812        /// the configuration should be consulted.
1813        pub has_header: Option<bool>, default = None
1814        pub delimiter: u8, default = b','
1815        pub quote: u8, default = b'"'
1816        pub terminator: Option<u8>, default = None
1817        pub escape: Option<u8>, default = None
1818        pub double_quote: Option<bool>, default = None
1819        /// Specifies whether newlines in (quoted) values are supported.
1820        ///
1821        /// Parsing newlines in quoted values may be affected by execution behaviour such as
1822        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1823        /// parsed successfully, which may reduce performance.
1824        ///
1825        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1826        pub newlines_in_values: Option<bool>, default = None
1827        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1828        pub schema_infer_max_rec: Option<usize>, default = None
1829        pub date_format: Option<String>, default = None
1830        pub datetime_format: Option<String>, default = None
1831        pub timestamp_format: Option<String>, default = None
1832        pub timestamp_tz_format: Option<String>, default = None
1833        pub time_format: Option<String>, default = None
1834        // The output format for Nulls in the CSV writer.
1835        pub null_value: Option<String>, default = None
1836        // The input regex for Nulls when loading CSVs.
1837        pub null_regex: Option<String>, default = None
1838        pub comment: Option<u8>, default = None
1839    }
1840}
1841
1842impl CsvOptions {
1843    /// Set a limit in terms of records to scan to infer the schema
1844    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1845    pub fn with_compression(
1846        mut self,
1847        compression_type_variant: CompressionTypeVariant,
1848    ) -> Self {
1849        self.compression = compression_type_variant;
1850        self
1851    }
1852
1853    /// Set a limit in terms of records to scan to infer the schema
1854    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1855    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
1856        self.schema_infer_max_rec = Some(max_rec);
1857        self
1858    }
1859
1860    /// Set true to indicate that the first line is a header.
1861    /// - default to true
1862    pub fn with_has_header(mut self, has_header: bool) -> Self {
1863        self.has_header = Some(has_header);
1864        self
1865    }
1866
1867    /// Returns true if the first line is a header. If format options does not
1868    /// specify whether there is a header, returns `None` (indicating that the
1869    /// configuration should be consulted).
1870    pub fn has_header(&self) -> Option<bool> {
1871        self.has_header
1872    }
1873
1874    /// The character separating values within a row.
1875    /// - default to ','
1876    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
1877        self.delimiter = delimiter;
1878        self
1879    }
1880
1881    /// The quote character in a row.
1882    /// - default to '"'
1883    pub fn with_quote(mut self, quote: u8) -> Self {
1884        self.quote = quote;
1885        self
1886    }
1887
1888    /// The character that terminates a row.
1889    /// - default to None (CRLF)
1890    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
1891        self.terminator = terminator;
1892        self
1893    }
1894
1895    /// The escape character in a row.
1896    /// - default is None
1897    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
1898        self.escape = escape;
1899        self
1900    }
1901
1902    /// Set true to indicate that the CSV quotes should be doubled.
1903    /// - default to true
1904    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
1905        self.double_quote = Some(double_quote);
1906        self
1907    }
1908
1909    /// Specifies whether newlines in (quoted) values are supported.
1910    ///
1911    /// Parsing newlines in quoted values may be affected by execution behaviour such as
1912    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1913    /// parsed successfully, which may reduce performance.
1914    ///
1915    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1916    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
1917        self.newlines_in_values = Some(newlines_in_values);
1918        self
1919    }
1920
1921    /// Set a `CompressionTypeVariant` of CSV
1922    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
1923    pub fn with_file_compression_type(
1924        mut self,
1925        compression: CompressionTypeVariant,
1926    ) -> Self {
1927        self.compression = compression;
1928        self
1929    }
1930
1931    /// The delimiter character.
1932    pub fn delimiter(&self) -> u8 {
1933        self.delimiter
1934    }
1935
1936    /// The quote character.
1937    pub fn quote(&self) -> u8 {
1938        self.quote
1939    }
1940
1941    /// The terminator character.
1942    pub fn terminator(&self) -> Option<u8> {
1943        self.terminator
1944    }
1945
1946    /// The escape character.
1947    pub fn escape(&self) -> Option<u8> {
1948        self.escape
1949    }
1950}
1951
1952config_namespace! {
1953    /// Options controlling JSON format
1954    pub struct JsonOptions {
1955        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1956        pub schema_infer_max_rec: Option<usize>, default = None
1957    }
1958}
1959
1960pub trait FormatOptionsExt: Display {}
1961
1962#[derive(Debug, Clone, PartialEq)]
1963#[allow(clippy::large_enum_variant)]
1964pub enum FormatOptions {
1965    CSV(CsvOptions),
1966    JSON(JsonOptions),
1967    #[cfg(feature = "parquet")]
1968    PARQUET(TableParquetOptions),
1969    AVRO,
1970    ARROW,
1971}
1972
1973impl Display for FormatOptions {
1974    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1975        let out = match self {
1976            FormatOptions::CSV(_) => "csv",
1977            FormatOptions::JSON(_) => "json",
1978            #[cfg(feature = "parquet")]
1979            FormatOptions::PARQUET(_) => "parquet",
1980            FormatOptions::AVRO => "avro",
1981            FormatOptions::ARROW => "arrow",
1982        };
1983        write!(f, "{}", out)
1984    }
1985}
1986
1987#[cfg(test)]
1988mod tests {
1989    use std::any::Any;
1990    use std::collections::HashMap;
1991
1992    use crate::config::{
1993        ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, Extensions,
1994        TableOptions,
1995    };
1996
1997    #[derive(Default, Debug, Clone)]
1998    pub struct TestExtensionConfig {
1999        /// Should "foo" be replaced by "bar"?
2000        pub properties: HashMap<String, String>,
2001    }
2002
2003    impl ExtensionOptions for TestExtensionConfig {
2004        fn as_any(&self) -> &dyn Any {
2005            self
2006        }
2007
2008        fn as_any_mut(&mut self) -> &mut dyn Any {
2009            self
2010        }
2011
2012        fn cloned(&self) -> Box<dyn ExtensionOptions> {
2013            Box::new(self.clone())
2014        }
2015
2016        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2017            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2018            assert_eq!(key, "test");
2019            self.properties.insert(rem.to_owned(), value.to_owned());
2020            Ok(())
2021        }
2022
2023        fn entries(&self) -> Vec<ConfigEntry> {
2024            self.properties
2025                .iter()
2026                .map(|(k, v)| ConfigEntry {
2027                    key: k.into(),
2028                    value: Some(v.into()),
2029                    description: "",
2030                })
2031                .collect()
2032        }
2033    }
2034
2035    impl ConfigExtension for TestExtensionConfig {
2036        const PREFIX: &'static str = "test";
2037    }
2038
2039    #[test]
2040    fn create_table_config() {
2041        let mut extension = Extensions::new();
2042        extension.insert(TestExtensionConfig::default());
2043        let table_config = TableOptions::new().with_extensions(extension);
2044        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2045        assert!(kafka_config.is_some())
2046    }
2047
2048    #[test]
2049    fn alter_test_extension_config() {
2050        let mut extension = Extensions::new();
2051        extension.insert(TestExtensionConfig::default());
2052        let mut table_config = TableOptions::new().with_extensions(extension);
2053        table_config.set_config_format(ConfigFileType::CSV);
2054        table_config.set("format.delimiter", ";").unwrap();
2055        assert_eq!(table_config.csv.delimiter, b';');
2056        table_config.set("test.bootstrap.servers", "asd").unwrap();
2057        let kafka_config = table_config
2058            .extensions
2059            .get::<TestExtensionConfig>()
2060            .unwrap();
2061        assert_eq!(
2062            kafka_config.properties.get("bootstrap.servers").unwrap(),
2063            "asd"
2064        );
2065    }
2066
2067    #[test]
2068    fn csv_u8_table_options() {
2069        let mut table_config = TableOptions::new();
2070        table_config.set_config_format(ConfigFileType::CSV);
2071        table_config.set("format.delimiter", ";").unwrap();
2072        assert_eq!(table_config.csv.delimiter as char, ';');
2073        table_config.set("format.escape", "\"").unwrap();
2074        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2075        table_config.set("format.escape", "\'").unwrap();
2076        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2077    }
2078
2079    #[cfg(feature = "parquet")]
2080    #[test]
2081    fn parquet_table_options() {
2082        let mut table_config = TableOptions::new();
2083        table_config.set_config_format(ConfigFileType::PARQUET);
2084        table_config
2085            .set("format.bloom_filter_enabled::col1", "true")
2086            .unwrap();
2087        assert_eq!(
2088            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2089            Some(true)
2090        );
2091    }
2092
2093    #[cfg(feature = "parquet")]
2094    #[test]
2095    fn parquet_table_options_config_entry() {
2096        let mut table_config = TableOptions::new();
2097        table_config.set_config_format(ConfigFileType::PARQUET);
2098        table_config
2099            .set("format.bloom_filter_enabled::col1", "true")
2100            .unwrap();
2101        let entries = table_config.entries();
2102        assert!(entries
2103            .iter()
2104            .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2105    }
2106
2107    #[cfg(feature = "parquet")]
2108    #[test]
2109    fn parquet_table_options_config_metadata_entry() {
2110        let mut table_config = TableOptions::new();
2111        table_config.set_config_format(ConfigFileType::PARQUET);
2112        table_config.set("format.metadata::key1", "").unwrap();
2113        table_config.set("format.metadata::key2", "value2").unwrap();
2114        table_config
2115            .set("format.metadata::key3", "value with spaces ")
2116            .unwrap();
2117        table_config
2118            .set("format.metadata::key4", "value with special chars :: :")
2119            .unwrap();
2120
2121        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
2122        assert_eq!(parsed_metadata.get("should not exist1"), None);
2123        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
2124        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
2125        assert_eq!(
2126            parsed_metadata.get("key3"),
2127            Some(&Some("value with spaces ".into()))
2128        );
2129        assert_eq!(
2130            parsed_metadata.get("key4"),
2131            Some(&Some("value with special chars :: :".into()))
2132        );
2133
2134        // duplicate keys are overwritten
2135        table_config.set("format.metadata::key_dupe", "A").unwrap();
2136        table_config.set("format.metadata::key_dupe", "B").unwrap();
2137        let parsed_metadata = table_config.parquet.key_value_metadata;
2138        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
2139    }
2140}