datafusion_common/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use std::any::Any;
21use std::collections::{BTreeMap, HashMap};
22use std::error::Error;
23use std::fmt::{self, Display};
24use std::str::FromStr;
25
26use crate::error::_config_err;
27use crate::parsers::CompressionTypeVariant;
28use crate::utils::get_available_parallelism;
29use crate::{DataFusionError, Result};
30
31/// A macro that wraps a configuration struct and automatically derives
32/// [`Default`] and [`ConfigField`] for it, allowing it to be used
33/// in the [`ConfigOptions`] configuration tree.
34///
35/// `transform` is used to normalize values before parsing.
36///
37/// For example,
38///
39/// ```ignore
40/// config_namespace! {
41/// /// Amazing config
42/// pub struct MyConfig {
43/// /// Field 1 doc
44/// field1: String, transform = str::to_lowercase, default = "".to_string()
45///
46/// /// Field 2 doc
47/// field2: usize, default = 232
48///
49/// /// Field 3 doc
50/// field3: Option<usize>, default = None
51/// }
52///}
53/// ```
54///
55/// Will generate
56///
57/// ```ignore
58/// /// Amazing config
59/// #[derive(Debug, Clone)]
60/// #[non_exhaustive]
61/// pub struct MyConfig {
62/// /// Field 1 doc
63/// field1: String,
64/// /// Field 2 doc
65/// field2: usize,
66/// /// Field 3 doc
67/// field3: Option<usize>,
68/// }
69/// impl ConfigField for MyConfig {
70/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
71/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
72/// match key {
73/// "field1" => {
74/// let value = str::to_lowercase(value);
75/// self.field1.set(rem, value.as_ref())
76/// },
77/// "field2" => self.field2.set(rem, value.as_ref()),
78/// "field3" => self.field3.set(rem, value.as_ref()),
79/// _ => _internal_err!(
80/// "Config value \"{}\" not found on MyConfig",
81/// key
82/// ),
83/// }
84/// }
85///
86/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
87/// let key = format!("{}.field1", key_prefix);
88/// let desc = "Field 1 doc";
89/// self.field1.visit(v, key.as_str(), desc);
90/// let key = format!("{}.field2", key_prefix);
91/// let desc = "Field 2 doc";
92/// self.field2.visit(v, key.as_str(), desc);
93/// let key = format!("{}.field3", key_prefix);
94/// let desc = "Field 3 doc";
95/// self.field3.visit(v, key.as_str(), desc);
96/// }
97/// }
98///
99/// impl Default for MyConfig {
100/// fn default() -> Self {
101/// Self {
102/// field1: "".to_string(),
103/// field2: 232,
104/// field3: None,
105/// }
106/// }
107/// }
108/// ```
109///
110/// NB: Misplaced commas may result in nonsensical errors
111#[macro_export]
112macro_rules! config_namespace {
113 (
114 $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
115 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
116 $(#[allow($($struct_de:tt)*)])?
117 $vis:vis struct $struct_name:ident {
118 $(
119 $(#[doc = $d:tt])* // Field-level documentation attributes
120 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
121 $(#[allow($($field_de:tt)*)])?
122 $field_vis:vis $field_name:ident : $field_type:ty,
123 $(warn = $warn:expr,)?
124 $(transform = $transform:expr,)?
125 default = $default:expr
126 )*$(,)*
127 }
128 ) => {
129 $(#[doc = $struct_d])* // Apply struct documentation
130 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
131 $(#[allow($($struct_de)*)])?
132 #[derive(Debug, Clone, PartialEq)]
133 $vis struct $struct_name {
134 $(
135 $(#[doc = $d])* // Apply field documentation
136 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
137 $(#[allow($($field_de)*)])?
138 $field_vis $field_name: $field_type,
139 )*
140 }
141
142 impl $crate::config::ConfigField for $struct_name {
143 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
144 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
145 match key {
146 $(
147 stringify!($field_name) => {
148 // Safely apply deprecated attribute if present
149 // $(#[allow(deprecated)])?
150 {
151 $(let value = $transform(value);)? // Apply transformation if specified
152 $(log::warn!($warn);)? // Log warning if specified
153 #[allow(deprecated)]
154 self.$field_name.set(rem, value.as_ref())
155 }
156 },
157 )*
158 _ => return $crate::error::_config_err!(
159 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
160 )
161 }
162 }
163
164 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
165 $(
166 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
167 let desc = concat!($($d),*).trim();
168 #[allow(deprecated)]
169 self.$field_name.visit(v, key.as_str(), desc);
170 )*
171 }
172 }
173 impl Default for $struct_name {
174 fn default() -> Self {
175 #[allow(deprecated)]
176 Self {
177 $($field_name: $default),*
178 }
179 }
180 }
181 }
182}
183
184config_namespace! {
185 /// Options related to catalog and directory scanning
186 ///
187 /// See also: [`SessionConfig`]
188 ///
189 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
190 pub struct CatalogOptions {
191 /// Whether the default catalog and schema should be created automatically.
192 pub create_default_catalog_and_schema: bool, default = true
193
194 /// The default catalog name - this impacts what SQL queries use if not specified
195 pub default_catalog: String, default = "datafusion".to_string()
196
197 /// The default schema name - this impacts what SQL queries use if not specified
198 pub default_schema: String, default = "public".to_string()
199
200 /// Should DataFusion provide access to `information_schema`
201 /// virtual tables for displaying schema information
202 pub information_schema: bool, default = false
203
204 /// Location scanned to load tables for `default` schema
205 pub location: Option<String>, default = None
206
207 /// Type of `TableProvider` to use when loading `default` schema
208 pub format: Option<String>, default = None
209
210 /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
211 /// if not specified explicitly in the statement.
212 pub has_header: bool, default = true
213
214 /// Specifies whether newlines in (quoted) CSV values are supported.
215 ///
216 /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
217 /// if not specified explicitly in the statement.
218 ///
219 /// Parsing newlines in quoted values may be affected by execution behaviour such as
220 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
221 /// parsed successfully, which may reduce performance.
222 pub newlines_in_values: bool, default = false
223 }
224}
225
226config_namespace! {
227 /// Options related to SQL parser
228 ///
229 /// See also: [`SessionConfig`]
230 ///
231 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
232 pub struct SqlParserOptions {
233 /// When set to true, SQL parser will parse float as decimal type
234 pub parse_float_as_decimal: bool, default = false
235
236 /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
237 pub enable_ident_normalization: bool, default = true
238
239 /// When set to true, SQL parser will normalize options value (convert value to lowercase).
240 /// Note that this option is ignored and will be removed in the future. All case-insensitive values
241 /// are normalized automatically.
242 pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
243
244 /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
245 /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
246 pub dialect: String, default = "generic".to_string()
247 // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
248
249 /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
250 /// ignore the length. If false, error if a `VARCHAR` with a length is
251 /// specified. The Arrow type system does not have a notion of maximum
252 /// string length and thus DataFusion can not enforce such limits.
253 pub support_varchar_with_length: bool, default = true
254
255 /// When set to true, the source locations relative to the original SQL
256 /// query (i.e. [`Span`](sqlparser::tokenizer::Span)) will be collected
257 /// and recorded in the logical plan nodes.
258 pub collect_spans: bool, default = false
259
260 /// Specifies the recursion depth limit when parsing complex SQL Queries
261 pub recursion_limit: usize, default = 50
262 }
263}
264
265config_namespace! {
266 /// Options related to query execution
267 ///
268 /// See also: [`SessionConfig`]
269 ///
270 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
271 pub struct ExecutionOptions {
272 /// Default batch size while creating new batches, it's especially useful for
273 /// buffer-in-memory batches since creating tiny batches would result in too much
274 /// metadata memory consumption
275 pub batch_size: usize, default = 8192
276
277 /// When set to true, record batches will be examined between each operator and
278 /// small batches will be coalesced into larger batches. This is helpful when there
279 /// are highly selective filters or joins that could produce tiny output batches. The
280 /// target batch size is determined by the configuration setting
281 pub coalesce_batches: bool, default = true
282
283 /// Should DataFusion collect statistics after listing files
284 pub collect_statistics: bool, default = false
285
286 /// Number of partitions for query execution. Increasing partitions can increase
287 /// concurrency.
288 ///
289 /// Defaults to the number of CPU cores on the system
290 pub target_partitions: usize, default = get_available_parallelism()
291
292 /// The default time zone
293 ///
294 /// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
295 /// according to this time zone, and then extract the hour
296 pub time_zone: Option<String>, default = Some("+00:00".into())
297
298 /// Parquet options
299 pub parquet: ParquetOptions, default = Default::default()
300
301 /// Fan-out during initial physical planning.
302 ///
303 /// This is mostly use to plan `UNION` children in parallel.
304 ///
305 /// Defaults to the number of CPU cores on the system
306 pub planning_concurrency: usize, default = get_available_parallelism()
307
308 /// When set to true, skips verifying that the schema produced by
309 /// planning the input of `LogicalPlan::Aggregate` exactly matches the
310 /// schema of the input plan.
311 ///
312 /// When set to false, if the schema does not match exactly
313 /// (including nullability and metadata), a planning error will be raised.
314 ///
315 /// This is used to workaround bugs in the planner that are now caught by
316 /// the new schema verification step.
317 pub skip_physical_aggregate_schema_check: bool, default = false
318
319 /// Specifies the reserved memory for each spillable sort operation to
320 /// facilitate an in-memory merge.
321 ///
322 /// When a sort operation spills to disk, the in-memory data must be
323 /// sorted and merged before being written to a file. This setting reserves
324 /// a specific amount of memory for that in-memory sort/merge process.
325 ///
326 /// Note: This setting is irrelevant if the sort operation cannot spill
327 /// (i.e., if there's no `DiskManager` configured).
328 pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
329
330 /// When sorting, below what size should data be concatenated
331 /// and sorted in a single RecordBatch rather than sorted in
332 /// batches and merged.
333 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
334
335 /// Number of files to read in parallel when inferring schema and statistics
336 pub meta_fetch_concurrency: usize, default = 32
337
338 /// Guarantees a minimum level of output files running in parallel.
339 /// RecordBatches will be distributed in round robin fashion to each
340 /// parallel writer. Each writer is closed and a new file opened once
341 /// soft_max_rows_per_output_file is reached.
342 pub minimum_parallel_output_files: usize, default = 4
343
344 /// Target number of rows in output files when writing multiple.
345 /// This is a soft max, so it can be exceeded slightly. There also
346 /// will be one file smaller than the limit if the total
347 /// number of rows written is not roughly divisible by the soft max
348 pub soft_max_rows_per_output_file: usize, default = 50000000
349
350 /// This is the maximum number of RecordBatches buffered
351 /// for each output file being worked. Higher values can potentially
352 /// give faster write performance at the cost of higher peak
353 /// memory consumption
354 pub max_buffered_batches_per_output_file: usize, default = 2
355
356 /// Should sub directories be ignored when scanning directories for data
357 /// files. Defaults to true (ignores subdirectories), consistent with
358 /// Hive. Note that this setting does not affect reading partitioned
359 /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
360 pub listing_table_ignore_subdirectory: bool, default = true
361
362 /// Should DataFusion support recursive CTEs
363 pub enable_recursive_ctes: bool, default = true
364
365 /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
366 /// statistics into the same file groups.
367 /// Currently experimental
368 pub split_file_groups_by_statistics: bool, default = false
369
370 /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
371 pub keep_partition_by_columns: bool, default = false
372
373 /// Aggregation ratio (number of distinct groups / number of input rows)
374 /// threshold for skipping partial aggregation. If the value is greater
375 /// then partial aggregation will skip aggregation for further input
376 pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
377
378 /// Number of input rows partial aggregation partition should process, before
379 /// aggregation ratio check and trying to switch to skipping aggregation mode
380 pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
381
382 /// Should DataFusion use row number estimates at the input to decide
383 /// whether increasing parallelism is beneficial or not. By default,
384 /// only exact row numbers (not estimates) are used for this decision.
385 /// Setting this flag to `true` will likely produce better plans.
386 /// if the source of statistics is accurate.
387 /// We plan to make this the default in the future.
388 pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
389
390 /// Should DataFusion enforce batch size in joins or not. By default,
391 /// DataFusion will not enforce batch size in joins. Enforcing batch size
392 /// in joins can reduce memory usage when joining large
393 /// tables with a highly-selective join filter, but is also slightly slower.
394 pub enforce_batch_size_in_joins: bool, default = false
395 }
396}
397
398config_namespace! {
399 /// Options for reading and writing parquet files
400 ///
401 /// See also: [`SessionConfig`]
402 ///
403 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
404 pub struct ParquetOptions {
405 // The following options affect reading parquet files
406
407 /// (reading) If true, reads the Parquet data page level metadata (the
408 /// Page Index), if present, to reduce the I/O and number of
409 /// rows decoded.
410 pub enable_page_index: bool, default = true
411
412 /// (reading) If true, the parquet reader attempts to skip entire row groups based
413 /// on the predicate in the query and the metadata (min/max values) stored in
414 /// the parquet file
415 pub pruning: bool, default = true
416
417 /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
418 /// the file Schema. This setting can help avoid schema conflicts when querying
419 /// multiple parquet files with schemas containing compatible types but different metadata
420 pub skip_metadata: bool, default = true
421
422 /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
423 /// bytes of the parquet file optimistically. If not specified, two reads are required:
424 /// One read to fetch the 8-byte parquet footer and
425 /// another to fetch the metadata length encoded in the footer
426 pub metadata_size_hint: Option<usize>, default = None
427
428 /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
429 /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
430 pub pushdown_filters: bool, default = false
431
432 /// (reading) If true, filter expressions evaluated during the parquet decoding operation
433 /// will be reordered heuristically to minimize the cost of evaluation. If false,
434 /// the filters are applied in the same order as written in the query
435 pub reorder_filters: bool, default = false
436
437 /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
438 /// and `Binary/BinaryLarge` with `BinaryView`.
439 pub schema_force_view_types: bool, default = true
440
441 /// (reading) If true, parquet reader will read columns of
442 /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
443 ///
444 /// Parquet files generated by some legacy writers do not correctly set
445 /// the UTF8 flag for strings, causing string columns to be loaded as
446 /// BLOB instead.
447 pub binary_as_string: bool, default = false
448
449 // The following options affect writing to parquet files
450 // and map to parquet::file::properties::WriterProperties
451
452 /// (writing) Sets best effort maximum size of data page in bytes
453 pub data_pagesize_limit: usize, default = 1024 * 1024
454
455 /// (writing) Sets write_batch_size in bytes
456 pub write_batch_size: usize, default = 1024
457
458 /// (writing) Sets parquet writer version
459 /// valid values are "1.0" and "2.0"
460 pub writer_version: String, default = "1.0".to_string()
461
462 /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
463 ///
464 /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
465 /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
466 pub skip_arrow_metadata: bool, default = false
467
468 /// (writing) Sets default parquet compression codec.
469 /// Valid values are: uncompressed, snappy, gzip(level),
470 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
471 /// These values are not case sensitive. If NULL, uses
472 /// default parquet writer setting
473 ///
474 /// Note that this default setting is not the same as
475 /// the default parquet writer setting.
476 pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
477
478 /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
479 /// default parquet writer setting
480 pub dictionary_enabled: Option<bool>, default = Some(true)
481
482 /// (writing) Sets best effort maximum dictionary page size, in bytes
483 pub dictionary_page_size_limit: usize, default = 1024 * 1024
484
485 /// (writing) Sets if statistics are enabled for any column
486 /// Valid values are: "none", "chunk", and "page"
487 /// These values are not case sensitive. If NULL, uses
488 /// default parquet writer setting
489 pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
490
491 /// (writing) Sets max statistics size for any column. If NULL, uses
492 /// default parquet writer setting
493 /// max_statistics_size is deprecated, currently it is not being used
494 // TODO: remove once deprecated
495 #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
496 pub max_statistics_size: Option<usize>, default = Some(4096)
497
498 /// (writing) Target maximum number of rows in each row group (defaults to 1M
499 /// rows). Writing larger row groups requires more memory to write, but
500 /// can get better compression and be faster to read.
501 pub max_row_group_size: usize, default = 1024 * 1024
502
503 /// (writing) Sets "created by" property
504 pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
505
506 /// (writing) Sets column index truncate length
507 pub column_index_truncate_length: Option<usize>, default = Some(64)
508
509 /// (writing) Sets statictics truncate length. If NULL, uses
510 /// default parquet writer setting
511 pub statistics_truncate_length: Option<usize>, default = None
512
513 /// (writing) Sets best effort maximum number of rows in data page
514 pub data_page_row_count_limit: usize, default = 20_000
515
516 /// (writing) Sets default encoding for any column.
517 /// Valid values are: plain, plain_dictionary, rle,
518 /// bit_packed, delta_binary_packed, delta_length_byte_array,
519 /// delta_byte_array, rle_dictionary, and byte_stream_split.
520 /// These values are not case sensitive. If NULL, uses
521 /// default parquet writer setting
522 pub encoding: Option<String>, transform = str::to_lowercase, default = None
523
524 /// (writing) Use any available bloom filters when reading parquet files
525 pub bloom_filter_on_read: bool, default = true
526
527 /// (writing) Write bloom filters for all columns when creating parquet files
528 pub bloom_filter_on_write: bool, default = false
529
530 /// (writing) Sets bloom filter false positive probability. If NULL, uses
531 /// default parquet writer setting
532 pub bloom_filter_fpp: Option<f64>, default = None
533
534 /// (writing) Sets bloom filter number of distinct values. If NULL, uses
535 /// default parquet writer setting
536 pub bloom_filter_ndv: Option<u64>, default = None
537
538 /// (writing) Controls whether DataFusion will attempt to speed up writing
539 /// parquet files by serializing them in parallel. Each column
540 /// in each row group in each output file are serialized in parallel
541 /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
542 pub allow_single_file_parallelism: bool, default = true
543
544 /// (writing) By default parallel parquet writer is tuned for minimum
545 /// memory usage in a streaming execution plan. You may see
546 /// a performance benefit when writing large parquet files
547 /// by increasing maximum_parallel_row_group_writers and
548 /// maximum_buffered_record_batches_per_stream if your system
549 /// has idle cores and can tolerate additional memory usage.
550 /// Boosting these values is likely worthwhile when
551 /// writing out already in-memory data, such as from a cached
552 /// data frame.
553 pub maximum_parallel_row_group_writers: usize, default = 1
554
555 /// (writing) By default parallel parquet writer is tuned for minimum
556 /// memory usage in a streaming execution plan. You may see
557 /// a performance benefit when writing large parquet files
558 /// by increasing maximum_parallel_row_group_writers and
559 /// maximum_buffered_record_batches_per_stream if your system
560 /// has idle cores and can tolerate additional memory usage.
561 /// Boosting these values is likely worthwhile when
562 /// writing out already in-memory data, such as from a cached
563 /// data frame.
564 pub maximum_buffered_record_batches_per_stream: usize, default = 2
565 }
566}
567
568config_namespace! {
569 /// Options related to query optimization
570 ///
571 /// See also: [`SessionConfig`]
572 ///
573 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
574 pub struct OptimizerOptions {
575 /// When set to true, the optimizer will push a limit operation into
576 /// grouped aggregations which have no aggregate expressions, as a soft limit,
577 /// emitting groups once the limit is reached, before all rows in the group are read.
578 pub enable_distinct_aggregation_soft_limit: bool, default = true
579
580 /// When set to true, the physical plan optimizer will try to add round robin
581 /// repartitioning to increase parallelism to leverage more CPU cores
582 pub enable_round_robin_repartition: bool, default = true
583
584 /// When set to true, the optimizer will attempt to perform limit operations
585 /// during aggregations, if possible
586 pub enable_topk_aggregation: bool, default = true
587
588 /// When set to true, the optimizer will insert filters before a join between
589 /// a nullable and non-nullable column to filter out nulls on the nullable side. This
590 /// filter can add additional overhead when the file format does not fully support
591 /// predicate push down.
592 pub filter_null_join_keys: bool, default = false
593
594 /// Should DataFusion repartition data using the aggregate keys to execute aggregates
595 /// in parallel using the provided `target_partitions` level
596 pub repartition_aggregations: bool, default = true
597
598 /// Minimum total files size in bytes to perform file scan repartitioning.
599 pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
600
601 /// Should DataFusion repartition data using the join keys to execute joins in parallel
602 /// using the provided `target_partitions` level
603 pub repartition_joins: bool, default = true
604
605 /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
606 /// its inputs do not have any ordering or filtering If the flag is not enabled,
607 /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
608 /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
609 /// RightAnti, and RightSemi - being produced only at the end of the execution.
610 /// This is not typical in stream processing. Additionally, without proper design for
611 /// long runner execution, all types of joins may encounter out-of-memory errors.
612 pub allow_symmetric_joins_without_pruning: bool, default = true
613
614 /// When set to `true`, file groups will be repartitioned to achieve maximum parallelism.
615 /// Currently Parquet and CSV formats are supported.
616 ///
617 /// If set to `true`, all files will be repartitioned evenly (i.e., a single large file
618 /// might be partitioned into smaller chunks) for parallel scanning.
619 /// If set to `false`, different files will be read in parallel, but repartitioning won't
620 /// happen within a single file.
621 pub repartition_file_scans: bool, default = true
622
623 /// Should DataFusion repartition data using the partitions keys to execute window
624 /// functions in parallel using the provided `target_partitions` level
625 pub repartition_windows: bool, default = true
626
627 /// Should DataFusion execute sorts in a per-partition fashion and merge
628 /// afterwards instead of coalescing first and sorting globally.
629 /// With this flag is enabled, plans in the form below
630 ///
631 /// ```text
632 /// "SortExec: [a@0 ASC]",
633 /// " CoalescePartitionsExec",
634 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
635 /// ```
636 /// would turn into the plan below which performs better in multithreaded environments
637 ///
638 /// ```text
639 /// "SortPreservingMergeExec: [a@0 ASC]",
640 /// " SortExec: [a@0 ASC]",
641 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
642 /// ```
643 pub repartition_sorts: bool, default = true
644
645 /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
646 /// (i.e. setting `preserve_order` to true on `RepartitionExec` and
647 /// using `SortPreservingMergeExec`)
648 ///
649 /// When false, DataFusion will maximize plan parallelism using
650 /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
651 pub prefer_existing_sort: bool, default = false
652
653 /// When set to true, the logical plan optimizer will produce warning
654 /// messages if any optimization rules produce errors and then proceed to the next
655 /// rule. When set to false, any rules that produce errors will cause the query to fail
656 pub skip_failed_rules: bool, default = false
657
658 /// Number of times that the optimizer will attempt to optimize the plan
659 pub max_passes: usize, default = 3
660
661 /// When set to true, the physical plan optimizer will run a top down
662 /// process to reorder the join keys
663 pub top_down_join_key_reordering: bool, default = true
664
665 /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
666 /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
667 pub prefer_hash_join: bool, default = true
668
669 /// The maximum estimated size in bytes for one input side of a HashJoin
670 /// will be collected into a single partition
671 pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
672
673 /// The maximum estimated size in rows for one input side of a HashJoin
674 /// will be collected into a single partition
675 pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
676
677 /// The default filter selectivity used by Filter Statistics
678 /// when an exact selectivity cannot be determined. Valid values are
679 /// between 0 (no selectivity) and 100 (all rows are selected).
680 pub default_filter_selectivity: u8, default = 20
681
682 /// When set to true, the optimizer will not attempt to convert Union to Interleave
683 pub prefer_existing_union: bool, default = false
684
685 /// When set to true, if the returned type is a view type
686 /// then the output will be coerced to a non-view.
687 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
688 pub expand_views_at_output: bool, default = false
689 }
690}
691
692config_namespace! {
693 /// Options controlling explain output
694 ///
695 /// See also: [`SessionConfig`]
696 ///
697 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
698 pub struct ExplainOptions {
699 /// When set to true, the explain statement will only print logical plans
700 pub logical_plan_only: bool, default = false
701
702 /// When set to true, the explain statement will only print physical plans
703 pub physical_plan_only: bool, default = false
704
705 /// When set to true, the explain statement will print operator statistics
706 /// for physical plans
707 pub show_statistics: bool, default = false
708
709 /// When set to true, the explain statement will print the partition sizes
710 pub show_sizes: bool, default = true
711
712 /// When set to true, the explain statement will print schema information
713 pub show_schema: bool, default = false
714 }
715}
716
717/// A key value pair, with a corresponding description
718#[derive(Debug)]
719pub struct ConfigEntry {
720 /// A unique string to identify this config value
721 pub key: String,
722
723 /// The value if any
724 pub value: Option<String>,
725
726 /// A description of this configuration entry
727 pub description: &'static str,
728}
729
730/// Configuration options struct, able to store both built-in configuration and custom options
731#[derive(Debug, Clone, Default)]
732#[non_exhaustive]
733pub struct ConfigOptions {
734 /// Catalog options
735 pub catalog: CatalogOptions,
736 /// Execution options
737 pub execution: ExecutionOptions,
738 /// Optimizer options
739 pub optimizer: OptimizerOptions,
740 /// SQL parser options
741 pub sql_parser: SqlParserOptions,
742 /// Explain options
743 pub explain: ExplainOptions,
744 /// Optional extensions registered using [`Extensions::insert`]
745 pub extensions: Extensions,
746}
747
748impl ConfigField for ConfigOptions {
749 fn set(&mut self, key: &str, value: &str) -> Result<()> {
750 // Extensions are handled in the public `ConfigOptions::set`
751 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
752 match key {
753 "catalog" => self.catalog.set(rem, value),
754 "execution" => self.execution.set(rem, value),
755 "optimizer" => self.optimizer.set(rem, value),
756 "explain" => self.explain.set(rem, value),
757 "sql_parser" => self.sql_parser.set(rem, value),
758 _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
759 }
760 }
761
762 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
763 self.catalog.visit(v, "datafusion.catalog", "");
764 self.execution.visit(v, "datafusion.execution", "");
765 self.optimizer.visit(v, "datafusion.optimizer", "");
766 self.explain.visit(v, "datafusion.explain", "");
767 self.sql_parser.visit(v, "datafusion.sql_parser", "");
768 }
769}
770
771impl ConfigOptions {
772 /// Creates a new [`ConfigOptions`] with default values
773 pub fn new() -> Self {
774 Self::default()
775 }
776
777 /// Set extensions to provided value
778 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
779 self.extensions = extensions;
780 self
781 }
782
783 /// Set a configuration option
784 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
785 let Some((prefix, key)) = key.split_once('.') else {
786 return _config_err!("could not find config namespace for key \"{key}\"");
787 };
788
789 if prefix == "datafusion" {
790 return ConfigField::set(self, key, value);
791 }
792
793 let Some(e) = self.extensions.0.get_mut(prefix) else {
794 return _config_err!("Could not find config namespace \"{prefix}\"");
795 };
796 e.0.set(key, value)
797 }
798
799 /// Create new ConfigOptions struct, taking values from
800 /// environment variables where possible.
801 ///
802 /// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
803 /// control `datafusion.execution.batch_size`.
804 pub fn from_env() -> Result<Self> {
805 struct Visitor(Vec<String>);
806
807 impl Visit for Visitor {
808 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
809 self.0.push(key.to_string())
810 }
811
812 fn none(&mut self, key: &str, _: &'static str) {
813 self.0.push(key.to_string())
814 }
815 }
816
817 // Extract the names of all fields and then look up the corresponding
818 // environment variables. This isn't hugely efficient but avoids
819 // ambiguity between `a.b` and `a_b` which would both correspond
820 // to an environment variable of `A_B`
821
822 let mut keys = Visitor(vec![]);
823 let mut ret = Self::default();
824 ret.visit(&mut keys, "datafusion", "");
825
826 for key in keys.0 {
827 let env = key.to_uppercase().replace('.', "_");
828 if let Some(var) = std::env::var_os(env) {
829 ret.set(&key, var.to_string_lossy().as_ref())?;
830 }
831 }
832
833 Ok(ret)
834 }
835
836 /// Create new ConfigOptions struct, taking values from a string hash map.
837 ///
838 /// Only the built-in configurations will be extracted from the hash map
839 /// and other key value pairs will be ignored.
840 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
841 struct Visitor(Vec<String>);
842
843 impl Visit for Visitor {
844 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
845 self.0.push(key.to_string())
846 }
847
848 fn none(&mut self, key: &str, _: &'static str) {
849 self.0.push(key.to_string())
850 }
851 }
852
853 let mut keys = Visitor(vec![]);
854 let mut ret = Self::default();
855 ret.visit(&mut keys, "datafusion", "");
856
857 for key in keys.0 {
858 if let Some(var) = settings.get(&key) {
859 ret.set(&key, var)?;
860 }
861 }
862
863 Ok(ret)
864 }
865
866 /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
867 pub fn entries(&self) -> Vec<ConfigEntry> {
868 struct Visitor(Vec<ConfigEntry>);
869
870 impl Visit for Visitor {
871 fn some<V: Display>(
872 &mut self,
873 key: &str,
874 value: V,
875 description: &'static str,
876 ) {
877 self.0.push(ConfigEntry {
878 key: key.to_string(),
879 value: Some(value.to_string()),
880 description,
881 })
882 }
883
884 fn none(&mut self, key: &str, description: &'static str) {
885 self.0.push(ConfigEntry {
886 key: key.to_string(),
887 value: None,
888 description,
889 })
890 }
891 }
892
893 let mut v = Visitor(vec![]);
894 self.visit(&mut v, "datafusion", "");
895
896 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
897 v.0
898 }
899
900 /// Generate documentation that can be included in the user guide
901 pub fn generate_config_markdown() -> String {
902 use std::fmt::Write as _;
903
904 let mut s = Self::default();
905
906 // Normalize for display
907 s.execution.target_partitions = 0;
908 s.execution.planning_concurrency = 0;
909
910 let mut docs = "| key | default | description |\n".to_string();
911 docs += "|-----|---------|-------------|\n";
912 let mut entries = s.entries();
913 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
914
915 for entry in s.entries() {
916 let _ = writeln!(
917 &mut docs,
918 "| {} | {} | {} |",
919 entry.key,
920 entry.value.as_deref().unwrap_or("NULL"),
921 entry.description
922 );
923 }
924 docs
925 }
926}
927
928/// [`ConfigExtension`] provides a mechanism to store third-party configuration
929/// within DataFusion [`ConfigOptions`]
930///
931/// This mechanism can be used to pass configuration to user defined functions
932/// or optimizer passes
933///
934/// # Example
935/// ```
936/// use datafusion_common::{
937/// config::ConfigExtension, extensions_options,
938/// config::ConfigOptions,
939/// };
940/// // Define a new configuration struct using the `extensions_options` macro
941/// extensions_options! {
942/// /// My own config options.
943/// pub struct MyConfig {
944/// /// Should "foo" be replaced by "bar"?
945/// pub foo_to_bar: bool, default = true
946///
947/// /// How many "baz" should be created?
948/// pub baz_count: usize, default = 1337
949/// }
950/// }
951///
952/// impl ConfigExtension for MyConfig {
953/// const PREFIX: &'static str = "my_config";
954/// }
955///
956/// // set up config struct and register extension
957/// let mut config = ConfigOptions::default();
958/// config.extensions.insert(MyConfig::default());
959///
960/// // overwrite config default
961/// config.set("my_config.baz_count", "42").unwrap();
962///
963/// // check config state
964/// let my_config = config.extensions.get::<MyConfig>().unwrap();
965/// assert!(my_config.foo_to_bar,);
966/// assert_eq!(my_config.baz_count, 42,);
967/// ```
968///
969/// # Note:
970/// Unfortunately associated constants are not currently object-safe, and so this
971/// extends the object-safe [`ExtensionOptions`]
972pub trait ConfigExtension: ExtensionOptions {
973 /// Configuration namespace prefix to use
974 ///
975 /// All values under this will be prefixed with `$PREFIX + "."`
976 const PREFIX: &'static str;
977}
978
979/// An object-safe API for storing arbitrary configuration.
980///
981/// See [`ConfigExtension`] for user defined configuration
982pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
983 /// Return `self` as [`Any`]
984 ///
985 /// This is needed until trait upcasting is stabilized
986 fn as_any(&self) -> &dyn Any;
987
988 /// Return `self` as [`Any`]
989 ///
990 /// This is needed until trait upcasting is stabilized
991 fn as_any_mut(&mut self) -> &mut dyn Any;
992
993 /// Return a deep clone of this [`ExtensionOptions`]
994 ///
995 /// It is important this does not share mutable state to avoid consistency issues
996 /// with configuration changing whilst queries are executing
997 fn cloned(&self) -> Box<dyn ExtensionOptions>;
998
999 /// Set the given `key`, `value` pair
1000 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1001
1002 /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1003 fn entries(&self) -> Vec<ConfigEntry>;
1004}
1005
1006/// A type-safe container for [`ConfigExtension`]
1007#[derive(Debug, Default, Clone)]
1008pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1009
1010impl Extensions {
1011 /// Create a new, empty [`Extensions`]
1012 pub fn new() -> Self {
1013 Self(BTreeMap::new())
1014 }
1015
1016 /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1017 pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1018 assert_ne!(T::PREFIX, "datafusion");
1019 let e = ExtensionBox(Box::new(extension));
1020 self.0.insert(T::PREFIX, e);
1021 }
1022
1023 /// Retrieves the extension of the given type if any
1024 pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1025 self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1026 }
1027
1028 /// Retrieves the extension of the given type if any
1029 pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1030 let e = self.0.get_mut(T::PREFIX)?;
1031 e.0.as_any_mut().downcast_mut()
1032 }
1033}
1034
1035#[derive(Debug)]
1036struct ExtensionBox(Box<dyn ExtensionOptions>);
1037
1038impl Clone for ExtensionBox {
1039 fn clone(&self) -> Self {
1040 Self(self.0.cloned())
1041 }
1042}
1043
1044/// A trait implemented by `config_namespace` and for field types that provides
1045/// the ability to walk and mutate the configuration tree
1046pub trait ConfigField {
1047 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1048
1049 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1050}
1051
1052impl<F: ConfigField + Default> ConfigField for Option<F> {
1053 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1054 match self {
1055 Some(s) => s.visit(v, key, description),
1056 None => v.none(key, description),
1057 }
1058 }
1059
1060 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1061 self.get_or_insert_with(Default::default).set(key, value)
1062 }
1063}
1064
1065fn default_transform<T>(input: &str) -> Result<T>
1066where
1067 T: FromStr,
1068 <T as FromStr>::Err: Sync + Send + Error + 'static,
1069{
1070 input.parse().map_err(|e| {
1071 DataFusionError::Context(
1072 format!(
1073 "Error parsing '{}' as {}",
1074 input,
1075 std::any::type_name::<T>()
1076 ),
1077 Box::new(DataFusionError::External(Box::new(e))),
1078 )
1079 })
1080}
1081
1082#[macro_export]
1083macro_rules! config_field {
1084 ($t:ty) => {
1085 config_field!($t, value => default_transform(value)?);
1086 };
1087
1088 ($t:ty, $arg:ident => $transform:expr) => {
1089 impl ConfigField for $t {
1090 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1091 v.some(key, self, description)
1092 }
1093
1094 fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
1095 *self = $transform;
1096 Ok(())
1097 }
1098 }
1099 };
1100}
1101
1102config_field!(String);
1103config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
1104config_field!(usize);
1105config_field!(f64);
1106config_field!(u64);
1107
1108impl ConfigField for u8 {
1109 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1110 v.some(key, self, description)
1111 }
1112
1113 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1114 if value.is_empty() {
1115 return Err(DataFusionError::Configuration(format!(
1116 "Input string for {} key is empty",
1117 key
1118 )));
1119 }
1120 // Check if the string is a valid number
1121 if let Ok(num) = value.parse::<u8>() {
1122 // TODO: Let's decide how we treat the numerical strings.
1123 *self = num;
1124 } else {
1125 let bytes = value.as_bytes();
1126 // Check if the first character is ASCII (single byte)
1127 if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1128 return Err(DataFusionError::Configuration(format!(
1129 "Error parsing {} as u8. Non-ASCII string provided",
1130 value
1131 )));
1132 }
1133 *self = bytes[0];
1134 }
1135 Ok(())
1136 }
1137}
1138
1139impl ConfigField for CompressionTypeVariant {
1140 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1141 v.some(key, self, description)
1142 }
1143
1144 fn set(&mut self, _: &str, value: &str) -> Result<()> {
1145 *self = CompressionTypeVariant::from_str(value)?;
1146 Ok(())
1147 }
1148}
1149
1150/// An implementation trait used to recursively walk configuration
1151pub trait Visit {
1152 fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1153
1154 fn none(&mut self, key: &str, description: &'static str);
1155}
1156
1157/// Convenience macro to create [`ExtensionsOptions`].
1158///
1159/// The created structure implements the following traits:
1160///
1161/// - [`Clone`]
1162/// - [`Debug`]
1163/// - [`Default`]
1164/// - [`ExtensionOptions`]
1165///
1166/// # Usage
1167/// The syntax is:
1168///
1169/// ```text
1170/// extensions_options! {
1171/// /// Struct docs (optional).
1172/// [<vis>] struct <StructName> {
1173/// /// Field docs (optional)
1174/// [<vis>] <field_name>: <field_type>, default = <default_value>
1175///
1176/// ... more fields
1177/// }
1178/// }
1179/// ```
1180///
1181/// The placeholders are:
1182/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1183/// - `<StructName>`: Struct name like `MyStruct`.
1184/// - `<field_name>`: Field name like `my_field`.
1185/// - `<field_type>`: Field type like `u8`.
1186/// - `<default_value>`: Default value matching the field type like `42`.
1187///
1188/// # Example
1189/// See also a full example on the [`ConfigExtension`] documentation
1190///
1191/// ```
1192/// use datafusion_common::extensions_options;
1193///
1194/// extensions_options! {
1195/// /// My own config options.
1196/// pub struct MyConfig {
1197/// /// Should "foo" be replaced by "bar"?
1198/// pub foo_to_bar: bool, default = true
1199///
1200/// /// How many "baz" should be created?
1201/// pub baz_count: usize, default = 1337
1202/// }
1203/// }
1204/// ```
1205///
1206///
1207/// [`Debug`]: std::fmt::Debug
1208/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1209#[macro_export]
1210macro_rules! extensions_options {
1211 (
1212 $(#[doc = $struct_d:tt])*
1213 $vis:vis struct $struct_name:ident {
1214 $(
1215 $(#[doc = $d:tt])*
1216 $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1217 )*$(,)*
1218 }
1219 ) => {
1220 $(#[doc = $struct_d])*
1221 #[derive(Debug, Clone)]
1222 #[non_exhaustive]
1223 $vis struct $struct_name{
1224 $(
1225 $(#[doc = $d])*
1226 $field_vis $field_name : $field_type,
1227 )*
1228 }
1229
1230 impl Default for $struct_name {
1231 fn default() -> Self {
1232 Self {
1233 $($field_name: $default),*
1234 }
1235 }
1236 }
1237
1238 impl $crate::config::ExtensionOptions for $struct_name {
1239 fn as_any(&self) -> &dyn ::std::any::Any {
1240 self
1241 }
1242
1243 fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1244 self
1245 }
1246
1247 fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1248 Box::new(self.clone())
1249 }
1250
1251 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1252 $crate::config::ConfigField::set(self, key, value)
1253 }
1254
1255 fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1256 struct Visitor(Vec<$crate::config::ConfigEntry>);
1257
1258 impl $crate::config::Visit for Visitor {
1259 fn some<V: std::fmt::Display>(
1260 &mut self,
1261 key: &str,
1262 value: V,
1263 description: &'static str,
1264 ) {
1265 self.0.push($crate::config::ConfigEntry {
1266 key: key.to_string(),
1267 value: Some(value.to_string()),
1268 description,
1269 })
1270 }
1271
1272 fn none(&mut self, key: &str, description: &'static str) {
1273 self.0.push($crate::config::ConfigEntry {
1274 key: key.to_string(),
1275 value: None,
1276 description,
1277 })
1278 }
1279 }
1280
1281 let mut v = Visitor(vec![]);
1282 // The prefix is not used for extensions.
1283 // The description is generated in ConfigField::visit.
1284 // We can just pass empty strings here.
1285 $crate::config::ConfigField::visit(self, &mut v, "", "");
1286 v.0
1287 }
1288 }
1289
1290 impl $crate::config::ConfigField for $struct_name {
1291 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1292 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1293 match key {
1294 $(
1295 stringify!($field_name) => {
1296 // Safely apply deprecated attribute if present
1297 // $(#[allow(deprecated)])?
1298 {
1299 #[allow(deprecated)]
1300 self.$field_name.set(rem, value.as_ref())
1301 }
1302 },
1303 )*
1304 _ => return $crate::error::_config_err!(
1305 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1306 )
1307 }
1308 }
1309
1310 fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1311 $(
1312 let key = stringify!($field_name).to_string();
1313 let desc = concat!($($d),*).trim();
1314 #[allow(deprecated)]
1315 self.$field_name.visit(v, key.as_str(), desc);
1316 )*
1317 }
1318 }
1319 }
1320}
1321
1322/// These file types have special built in behavior for configuration.
1323/// Use TableOptions::Extensions for configuring other file types.
1324#[derive(Debug, Clone)]
1325pub enum ConfigFileType {
1326 CSV,
1327 #[cfg(feature = "parquet")]
1328 PARQUET,
1329 JSON,
1330}
1331
1332/// Represents the configuration options available for handling different table formats within a data processing application.
1333/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1334/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1335#[derive(Debug, Clone, Default)]
1336pub struct TableOptions {
1337 /// Configuration options for CSV file handling. This includes settings like the delimiter,
1338 /// quote character, and whether the first row is considered as headers.
1339 pub csv: CsvOptions,
1340
1341 /// Configuration options for Parquet file handling. This includes settings for compression,
1342 /// encoding, and other Parquet-specific file characteristics.
1343 pub parquet: TableParquetOptions,
1344
1345 /// Configuration options for JSON file handling.
1346 pub json: JsonOptions,
1347
1348 /// The current file format that the table operations should assume. This option allows
1349 /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1350 pub current_format: Option<ConfigFileType>,
1351
1352 /// Optional extensions that can be used to extend or customize the behavior of the table
1353 /// options. Extensions can be registered using `Extensions::insert` and might include
1354 /// custom file handling logic, additional configuration parameters, or other enhancements.
1355 pub extensions: Extensions,
1356}
1357
1358impl ConfigField for TableOptions {
1359 /// Visits configuration settings for the current file format, or all formats if none is selected.
1360 ///
1361 /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1362 /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1363 /// it visits all available format settings.
1364 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1365 if let Some(file_type) = &self.current_format {
1366 match file_type {
1367 #[cfg(feature = "parquet")]
1368 ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1369 ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1370 ConfigFileType::JSON => self.json.visit(v, "format", ""),
1371 }
1372 } else {
1373 self.csv.visit(v, "csv", "");
1374 self.parquet.visit(v, "parquet", "");
1375 self.json.visit(v, "json", "");
1376 }
1377 }
1378
1379 /// Sets a configuration value for a specific key within `TableOptions`.
1380 ///
1381 /// This method delegates setting configuration values to the specific file format configurations,
1382 /// based on the current format selected. If no format is selected, it returns an error.
1383 ///
1384 /// # Parameters
1385 ///
1386 /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1387 /// for CSV format.
1388 /// * `value`: The value to set for the specified configuration key.
1389 ///
1390 /// # Returns
1391 ///
1392 /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1393 /// or if setting the configuration value fails for the specific format.
1394 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1395 // Extensions are handled in the public `ConfigOptions::set`
1396 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1397 match key {
1398 "format" => {
1399 let Some(format) = &self.current_format else {
1400 return _config_err!("Specify a format for TableOptions");
1401 };
1402 match format {
1403 #[cfg(feature = "parquet")]
1404 ConfigFileType::PARQUET => self.parquet.set(rem, value),
1405 ConfigFileType::CSV => self.csv.set(rem, value),
1406 ConfigFileType::JSON => self.json.set(rem, value),
1407 }
1408 }
1409 _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1410 }
1411 }
1412}
1413
1414impl TableOptions {
1415 /// Constructs a new instance of `TableOptions` with default settings.
1416 ///
1417 /// # Returns
1418 ///
1419 /// A new `TableOptions` instance with default configuration values.
1420 pub fn new() -> Self {
1421 Self::default()
1422 }
1423
1424 /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1425 ///
1426 /// # Parameters
1427 ///
1428 /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1429 ///
1430 /// # Returns
1431 ///
1432 /// A new `TableOptions` instance with settings applied from the session config.
1433 pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1434 let initial = TableOptions::default();
1435 initial.combine_with_session_config(config)
1436 }
1437
1438 /// Updates the current `TableOptions` with settings from a given session config.
1439 ///
1440 /// # Parameters
1441 ///
1442 /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1443 ///
1444 /// # Returns
1445 ///
1446 /// A new `TableOptions` instance with updated settings from the session config.
1447 #[must_use = "this method returns a new instance"]
1448 pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1449 let mut clone = self.clone();
1450 clone.parquet.global = config.execution.parquet.clone();
1451 clone
1452 }
1453
1454 /// Sets the file format for the table.
1455 ///
1456 /// # Parameters
1457 ///
1458 /// * `format`: The file format to use (e.g., CSV, Parquet).
1459 pub fn set_config_format(&mut self, format: ConfigFileType) {
1460 self.current_format = Some(format);
1461 }
1462
1463 /// Sets the extensions for this `TableOptions` instance.
1464 ///
1465 /// # Parameters
1466 ///
1467 /// * `extensions`: The `Extensions` instance to set.
1468 ///
1469 /// # Returns
1470 ///
1471 /// A new `TableOptions` instance with the specified extensions applied.
1472 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1473 self.extensions = extensions;
1474 self
1475 }
1476
1477 /// Sets a specific configuration option.
1478 ///
1479 /// # Parameters
1480 ///
1481 /// * `key`: The configuration key (e.g., "format.delimiter").
1482 /// * `value`: The value to set for the specified key.
1483 ///
1484 /// # Returns
1485 ///
1486 /// A result indicating success or failure in setting the configuration option.
1487 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1488 let Some((prefix, _)) = key.split_once('.') else {
1489 return _config_err!("could not find config namespace for key \"{key}\"");
1490 };
1491
1492 if prefix == "format" {
1493 return ConfigField::set(self, key, value);
1494 }
1495
1496 if prefix == "execution" {
1497 return Ok(());
1498 }
1499
1500 let Some(e) = self.extensions.0.get_mut(prefix) else {
1501 return _config_err!("Could not find config namespace \"{prefix}\"");
1502 };
1503 e.0.set(key, value)
1504 }
1505
1506 /// Initializes a new `TableOptions` from a hash map of string settings.
1507 ///
1508 /// # Parameters
1509 ///
1510 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1511 ///
1512 /// # Returns
1513 ///
1514 /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1515 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1516 let mut ret = Self::default();
1517 for (k, v) in settings {
1518 ret.set(k, v)?;
1519 }
1520
1521 Ok(ret)
1522 }
1523
1524 /// Modifies the current `TableOptions` instance with settings from a hash map.
1525 ///
1526 /// # Parameters
1527 ///
1528 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1529 ///
1530 /// # Returns
1531 ///
1532 /// A result indicating success or failure in applying the settings.
1533 pub fn alter_with_string_hash_map(
1534 &mut self,
1535 settings: &HashMap<String, String>,
1536 ) -> Result<()> {
1537 for (k, v) in settings {
1538 self.set(k, v)?;
1539 }
1540 Ok(())
1541 }
1542
1543 /// Retrieves all configuration entries from this `TableOptions`.
1544 ///
1545 /// # Returns
1546 ///
1547 /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1548 pub fn entries(&self) -> Vec<ConfigEntry> {
1549 struct Visitor(Vec<ConfigEntry>);
1550
1551 impl Visit for Visitor {
1552 fn some<V: Display>(
1553 &mut self,
1554 key: &str,
1555 value: V,
1556 description: &'static str,
1557 ) {
1558 self.0.push(ConfigEntry {
1559 key: key.to_string(),
1560 value: Some(value.to_string()),
1561 description,
1562 })
1563 }
1564
1565 fn none(&mut self, key: &str, description: &'static str) {
1566 self.0.push(ConfigEntry {
1567 key: key.to_string(),
1568 value: None,
1569 description,
1570 })
1571 }
1572 }
1573
1574 let mut v = Visitor(vec![]);
1575 self.visit(&mut v, "format", "");
1576
1577 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1578 v.0
1579 }
1580}
1581
1582/// Options that control how Parquet files are read, including global options
1583/// that apply to all columns and optional column-specific overrides
1584///
1585/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
1586/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
1587/// (e.g. sorting_columns).
1588#[derive(Clone, Default, Debug, PartialEq)]
1589pub struct TableParquetOptions {
1590 /// Global Parquet options that propagates to all columns.
1591 pub global: ParquetOptions,
1592 /// Column specific options. Default usage is parquet.XX::column.
1593 pub column_specific_options: HashMap<String, ParquetColumnOptions>,
1594 /// Additional file-level metadata to include. Inserted into the key_value_metadata
1595 /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1596 ///
1597 /// Multiple entries are permitted
1598 /// ```sql
1599 /// OPTIONS (
1600 /// 'format.metadata::key1' '',
1601 /// 'format.metadata::key2' 'value',
1602 /// 'format.metadata::key3' 'value has spaces',
1603 /// 'format.metadata::key4' 'value has special chars :: :',
1604 /// 'format.metadata::key_dupe' 'original will be overwritten',
1605 /// 'format.metadata::key_dupe' 'final'
1606 /// )
1607 /// ```
1608 pub key_value_metadata: HashMap<String, Option<String>>,
1609}
1610
1611impl TableParquetOptions {
1612 /// Return new default TableParquetOptions
1613 pub fn new() -> Self {
1614 Self::default()
1615 }
1616
1617 /// Set whether the encoding of the arrow metadata should occur
1618 /// during the writing of parquet.
1619 ///
1620 /// Default is to encode the arrow schema in the file kv_metadata.
1621 pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1622 Self {
1623 global: ParquetOptions {
1624 skip_arrow_metadata: skip,
1625 ..self.global
1626 },
1627 ..self
1628 }
1629 }
1630}
1631
1632impl ConfigField for TableParquetOptions {
1633 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
1634 self.global.visit(v, key_prefix, description);
1635 self.column_specific_options
1636 .visit(v, key_prefix, description)
1637 }
1638
1639 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1640 // Determine if the key is a global, metadata, or column-specific setting
1641 if key.starts_with("metadata::") {
1642 let k = match key.split("::").collect::<Vec<_>>()[..] {
1643 [_meta] | [_meta, ""] => {
1644 return _config_err!(
1645 "Invalid metadata key provided, missing key in metadata::<key>"
1646 )
1647 }
1648 [_meta, k] => k.into(),
1649 _ => {
1650 return _config_err!(
1651 "Invalid metadata key provided, found too many '::' in \"{key}\""
1652 )
1653 }
1654 };
1655 self.key_value_metadata.insert(k, Some(value.into()));
1656 Ok(())
1657 } else if key.contains("::") {
1658 self.column_specific_options.set(key, value)
1659 } else {
1660 self.global.set(key, value)
1661 }
1662 }
1663}
1664
1665macro_rules! config_namespace_with_hashmap {
1666 (
1667 $(#[doc = $struct_d:tt])*
1668 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
1669 $vis:vis struct $struct_name:ident {
1670 $(
1671 $(#[doc = $d:tt])*
1672 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
1673 $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
1674 )*$(,)*
1675 }
1676 ) => {
1677
1678 $(#[doc = $struct_d])*
1679 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
1680 #[derive(Debug, Clone, PartialEq)]
1681 $vis struct $struct_name{
1682 $(
1683 $(#[doc = $d])*
1684 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
1685 $field_vis $field_name : $field_type,
1686 )*
1687 }
1688
1689 impl ConfigField for $struct_name {
1690 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1691 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1692 match key {
1693 $(
1694 stringify!($field_name) => {
1695 // Handle deprecated fields
1696 #[allow(deprecated)] // Allow deprecated fields
1697 $(let value = $transform(value);)?
1698 self.$field_name.set(rem, value.as_ref())
1699 },
1700 )*
1701 _ => _config_err!(
1702 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1703 )
1704 }
1705 }
1706
1707 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1708 $(
1709 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
1710 let desc = concat!($($d),*).trim();
1711 // Handle deprecated fields
1712 #[allow(deprecated)]
1713 self.$field_name.visit(v, key.as_str(), desc);
1714 )*
1715 }
1716 }
1717
1718 impl Default for $struct_name {
1719 fn default() -> Self {
1720 #[allow(deprecated)]
1721 Self {
1722 $($field_name: $default),*
1723 }
1724 }
1725 }
1726
1727 impl ConfigField for HashMap<String,$struct_name> {
1728 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1729 let parts: Vec<&str> = key.splitn(2, "::").collect();
1730 match parts.as_slice() {
1731 [inner_key, hashmap_key] => {
1732 // Get or create the struct for the specified key
1733 let inner_value = self
1734 .entry((*hashmap_key).to_owned())
1735 .or_insert_with($struct_name::default);
1736
1737 inner_value.set(inner_key, value)
1738 }
1739 _ => _config_err!("Unrecognized key '{key}'."),
1740 }
1741 }
1742
1743 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
1744 for (column_name, col_options) in self {
1745 $(
1746 let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
1747 let desc = concat!($($d),*).trim();
1748 #[allow(deprecated)]
1749 col_options.$field_name.visit(v, key.as_str(), desc);
1750 )*
1751 }
1752 }
1753 }
1754 }
1755}
1756
1757config_namespace_with_hashmap! {
1758 /// Options controlling parquet format for individual columns.
1759 ///
1760 /// See [`ParquetOptions`] for more details
1761 pub struct ParquetColumnOptions {
1762 /// Sets if bloom filter is enabled for the column path.
1763 pub bloom_filter_enabled: Option<bool>, default = None
1764
1765 /// Sets encoding for the column path.
1766 /// Valid values are: plain, plain_dictionary, rle,
1767 /// bit_packed, delta_binary_packed, delta_length_byte_array,
1768 /// delta_byte_array, rle_dictionary, and byte_stream_split.
1769 /// These values are not case-sensitive. If NULL, uses
1770 /// default parquet options
1771 pub encoding: Option<String>, default = None
1772
1773 /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
1774 /// default parquet options
1775 pub dictionary_enabled: Option<bool>, default = None
1776
1777 /// Sets default parquet compression codec for the column path.
1778 /// Valid values are: uncompressed, snappy, gzip(level),
1779 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
1780 /// These values are not case-sensitive. If NULL, uses
1781 /// default parquet options
1782 pub compression: Option<String>, transform = str::to_lowercase, default = None
1783
1784 /// Sets if statistics are enabled for the column
1785 /// Valid values are: "none", "chunk", and "page"
1786 /// These values are not case sensitive. If NULL, uses
1787 /// default parquet options
1788 pub statistics_enabled: Option<String>, default = None
1789
1790 /// Sets bloom filter false positive probability for the column path. If NULL, uses
1791 /// default parquet options
1792 pub bloom_filter_fpp: Option<f64>, default = None
1793
1794 /// Sets bloom filter number of distinct values. If NULL, uses
1795 /// default parquet options
1796 pub bloom_filter_ndv: Option<u64>, default = None
1797
1798 /// Sets max statistics size for the column path. If NULL, uses
1799 /// default parquet options
1800 /// max_statistics_size is deprecated, currently it is not being used
1801 // TODO: remove once deprecated
1802 #[deprecated(since = "45.0.0", note = "Setting does not do anything")]
1803 pub max_statistics_size: Option<usize>, default = None
1804 }
1805}
1806
1807config_namespace! {
1808 /// Options controlling CSV format
1809 pub struct CsvOptions {
1810 /// Specifies whether there is a CSV header (i.e. the first line
1811 /// consists of is column names). The value `None` indicates that
1812 /// the configuration should be consulted.
1813 pub has_header: Option<bool>, default = None
1814 pub delimiter: u8, default = b','
1815 pub quote: u8, default = b'"'
1816 pub terminator: Option<u8>, default = None
1817 pub escape: Option<u8>, default = None
1818 pub double_quote: Option<bool>, default = None
1819 /// Specifies whether newlines in (quoted) values are supported.
1820 ///
1821 /// Parsing newlines in quoted values may be affected by execution behaviour such as
1822 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1823 /// parsed successfully, which may reduce performance.
1824 ///
1825 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1826 pub newlines_in_values: Option<bool>, default = None
1827 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1828 pub schema_infer_max_rec: Option<usize>, default = None
1829 pub date_format: Option<String>, default = None
1830 pub datetime_format: Option<String>, default = None
1831 pub timestamp_format: Option<String>, default = None
1832 pub timestamp_tz_format: Option<String>, default = None
1833 pub time_format: Option<String>, default = None
1834 // The output format for Nulls in the CSV writer.
1835 pub null_value: Option<String>, default = None
1836 // The input regex for Nulls when loading CSVs.
1837 pub null_regex: Option<String>, default = None
1838 pub comment: Option<u8>, default = None
1839 }
1840}
1841
1842impl CsvOptions {
1843 /// Set a limit in terms of records to scan to infer the schema
1844 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1845 pub fn with_compression(
1846 mut self,
1847 compression_type_variant: CompressionTypeVariant,
1848 ) -> Self {
1849 self.compression = compression_type_variant;
1850 self
1851 }
1852
1853 /// Set a limit in terms of records to scan to infer the schema
1854 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
1855 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
1856 self.schema_infer_max_rec = Some(max_rec);
1857 self
1858 }
1859
1860 /// Set true to indicate that the first line is a header.
1861 /// - default to true
1862 pub fn with_has_header(mut self, has_header: bool) -> Self {
1863 self.has_header = Some(has_header);
1864 self
1865 }
1866
1867 /// Returns true if the first line is a header. If format options does not
1868 /// specify whether there is a header, returns `None` (indicating that the
1869 /// configuration should be consulted).
1870 pub fn has_header(&self) -> Option<bool> {
1871 self.has_header
1872 }
1873
1874 /// The character separating values within a row.
1875 /// - default to ','
1876 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
1877 self.delimiter = delimiter;
1878 self
1879 }
1880
1881 /// The quote character in a row.
1882 /// - default to '"'
1883 pub fn with_quote(mut self, quote: u8) -> Self {
1884 self.quote = quote;
1885 self
1886 }
1887
1888 /// The character that terminates a row.
1889 /// - default to None (CRLF)
1890 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
1891 self.terminator = terminator;
1892 self
1893 }
1894
1895 /// The escape character in a row.
1896 /// - default is None
1897 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
1898 self.escape = escape;
1899 self
1900 }
1901
1902 /// Set true to indicate that the CSV quotes should be doubled.
1903 /// - default to true
1904 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
1905 self.double_quote = Some(double_quote);
1906 self
1907 }
1908
1909 /// Specifies whether newlines in (quoted) values are supported.
1910 ///
1911 /// Parsing newlines in quoted values may be affected by execution behaviour such as
1912 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
1913 /// parsed successfully, which may reduce performance.
1914 ///
1915 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1916 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
1917 self.newlines_in_values = Some(newlines_in_values);
1918 self
1919 }
1920
1921 /// Set a `CompressionTypeVariant` of CSV
1922 /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
1923 pub fn with_file_compression_type(
1924 mut self,
1925 compression: CompressionTypeVariant,
1926 ) -> Self {
1927 self.compression = compression;
1928 self
1929 }
1930
1931 /// The delimiter character.
1932 pub fn delimiter(&self) -> u8 {
1933 self.delimiter
1934 }
1935
1936 /// The quote character.
1937 pub fn quote(&self) -> u8 {
1938 self.quote
1939 }
1940
1941 /// The terminator character.
1942 pub fn terminator(&self) -> Option<u8> {
1943 self.terminator
1944 }
1945
1946 /// The escape character.
1947 pub fn escape(&self) -> Option<u8> {
1948 self.escape
1949 }
1950}
1951
1952config_namespace! {
1953 /// Options controlling JSON format
1954 pub struct JsonOptions {
1955 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
1956 pub schema_infer_max_rec: Option<usize>, default = None
1957 }
1958}
1959
1960pub trait FormatOptionsExt: Display {}
1961
1962#[derive(Debug, Clone, PartialEq)]
1963#[allow(clippy::large_enum_variant)]
1964pub enum FormatOptions {
1965 CSV(CsvOptions),
1966 JSON(JsonOptions),
1967 #[cfg(feature = "parquet")]
1968 PARQUET(TableParquetOptions),
1969 AVRO,
1970 ARROW,
1971}
1972
1973impl Display for FormatOptions {
1974 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1975 let out = match self {
1976 FormatOptions::CSV(_) => "csv",
1977 FormatOptions::JSON(_) => "json",
1978 #[cfg(feature = "parquet")]
1979 FormatOptions::PARQUET(_) => "parquet",
1980 FormatOptions::AVRO => "avro",
1981 FormatOptions::ARROW => "arrow",
1982 };
1983 write!(f, "{}", out)
1984 }
1985}
1986
1987#[cfg(test)]
1988mod tests {
1989 use std::any::Any;
1990 use std::collections::HashMap;
1991
1992 use crate::config::{
1993 ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, Extensions,
1994 TableOptions,
1995 };
1996
1997 #[derive(Default, Debug, Clone)]
1998 pub struct TestExtensionConfig {
1999 /// Should "foo" be replaced by "bar"?
2000 pub properties: HashMap<String, String>,
2001 }
2002
2003 impl ExtensionOptions for TestExtensionConfig {
2004 fn as_any(&self) -> &dyn Any {
2005 self
2006 }
2007
2008 fn as_any_mut(&mut self) -> &mut dyn Any {
2009 self
2010 }
2011
2012 fn cloned(&self) -> Box<dyn ExtensionOptions> {
2013 Box::new(self.clone())
2014 }
2015
2016 fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2017 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2018 assert_eq!(key, "test");
2019 self.properties.insert(rem.to_owned(), value.to_owned());
2020 Ok(())
2021 }
2022
2023 fn entries(&self) -> Vec<ConfigEntry> {
2024 self.properties
2025 .iter()
2026 .map(|(k, v)| ConfigEntry {
2027 key: k.into(),
2028 value: Some(v.into()),
2029 description: "",
2030 })
2031 .collect()
2032 }
2033 }
2034
2035 impl ConfigExtension for TestExtensionConfig {
2036 const PREFIX: &'static str = "test";
2037 }
2038
2039 #[test]
2040 fn create_table_config() {
2041 let mut extension = Extensions::new();
2042 extension.insert(TestExtensionConfig::default());
2043 let table_config = TableOptions::new().with_extensions(extension);
2044 let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2045 assert!(kafka_config.is_some())
2046 }
2047
2048 #[test]
2049 fn alter_test_extension_config() {
2050 let mut extension = Extensions::new();
2051 extension.insert(TestExtensionConfig::default());
2052 let mut table_config = TableOptions::new().with_extensions(extension);
2053 table_config.set_config_format(ConfigFileType::CSV);
2054 table_config.set("format.delimiter", ";").unwrap();
2055 assert_eq!(table_config.csv.delimiter, b';');
2056 table_config.set("test.bootstrap.servers", "asd").unwrap();
2057 let kafka_config = table_config
2058 .extensions
2059 .get::<TestExtensionConfig>()
2060 .unwrap();
2061 assert_eq!(
2062 kafka_config.properties.get("bootstrap.servers").unwrap(),
2063 "asd"
2064 );
2065 }
2066
2067 #[test]
2068 fn csv_u8_table_options() {
2069 let mut table_config = TableOptions::new();
2070 table_config.set_config_format(ConfigFileType::CSV);
2071 table_config.set("format.delimiter", ";").unwrap();
2072 assert_eq!(table_config.csv.delimiter as char, ';');
2073 table_config.set("format.escape", "\"").unwrap();
2074 assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2075 table_config.set("format.escape", "\'").unwrap();
2076 assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2077 }
2078
2079 #[cfg(feature = "parquet")]
2080 #[test]
2081 fn parquet_table_options() {
2082 let mut table_config = TableOptions::new();
2083 table_config.set_config_format(ConfigFileType::PARQUET);
2084 table_config
2085 .set("format.bloom_filter_enabled::col1", "true")
2086 .unwrap();
2087 assert_eq!(
2088 table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2089 Some(true)
2090 );
2091 }
2092
2093 #[cfg(feature = "parquet")]
2094 #[test]
2095 fn parquet_table_options_config_entry() {
2096 let mut table_config = TableOptions::new();
2097 table_config.set_config_format(ConfigFileType::PARQUET);
2098 table_config
2099 .set("format.bloom_filter_enabled::col1", "true")
2100 .unwrap();
2101 let entries = table_config.entries();
2102 assert!(entries
2103 .iter()
2104 .any(|item| item.key == "format.bloom_filter_enabled::col1"))
2105 }
2106
2107 #[cfg(feature = "parquet")]
2108 #[test]
2109 fn parquet_table_options_config_metadata_entry() {
2110 let mut table_config = TableOptions::new();
2111 table_config.set_config_format(ConfigFileType::PARQUET);
2112 table_config.set("format.metadata::key1", "").unwrap();
2113 table_config.set("format.metadata::key2", "value2").unwrap();
2114 table_config
2115 .set("format.metadata::key3", "value with spaces ")
2116 .unwrap();
2117 table_config
2118 .set("format.metadata::key4", "value with special chars :: :")
2119 .unwrap();
2120
2121 let parsed_metadata = table_config.parquet.key_value_metadata.clone();
2122 assert_eq!(parsed_metadata.get("should not exist1"), None);
2123 assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
2124 assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
2125 assert_eq!(
2126 parsed_metadata.get("key3"),
2127 Some(&Some("value with spaces ".into()))
2128 );
2129 assert_eq!(
2130 parsed_metadata.get("key4"),
2131 Some(&Some("value with special chars :: :".into()))
2132 );
2133
2134 // duplicate keys are overwritten
2135 table_config.set("format.metadata::key_dupe", "A").unwrap();
2136 table_config.set("format.metadata::key_dupe", "B").unwrap();
2137 let parsed_metadata = table_config.parquet.key_value_metadata;
2138 assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
2139 }
2140}