datafusion_common/file_options/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Options related to how parquet files should be written
19
20use base64::Engine;
21use std::sync::Arc;
22
23use crate::{
24    config::{ParquetOptions, TableParquetOptions},
25    DataFusionError, Result, _internal_datafusion_err,
26};
27
28use arrow::datatypes::Schema;
29// TODO: handle once deprecated
30#[allow(deprecated)]
31use parquet::{
32    arrow::ARROW_SCHEMA_META_KEY,
33    basic::{BrotliLevel, GzipLevel, ZstdLevel},
34    file::{
35        metadata::KeyValue,
36        properties::{
37            EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
38            DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
39        },
40    },
41    schema::types::ColumnPath,
42};
43
44/// Options for writing parquet files
45#[derive(Clone, Debug)]
46pub struct ParquetWriterOptions {
47    /// parquet-rs writer properties
48    pub writer_options: WriterProperties,
49}
50
51impl ParquetWriterOptions {
52    pub fn new(writer_options: WriterProperties) -> Self {
53        Self { writer_options }
54    }
55}
56
57impl ParquetWriterOptions {
58    pub fn writer_options(&self) -> &WriterProperties {
59        &self.writer_options
60    }
61}
62
63impl TableParquetOptions {
64    /// Add the arrow schema to the parquet kv_metadata.
65    /// If already exists, then overwrites.
66    pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
67        self.key_value_metadata.insert(
68            ARROW_SCHEMA_META_KEY.into(),
69            Some(encode_arrow_schema(schema)),
70        );
71    }
72}
73
74impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
75    type Error = DataFusionError;
76
77    fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
78        // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
79        Ok(ParquetWriterOptions {
80            writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
81                .build(),
82        })
83    }
84}
85
86impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
87    type Error = DataFusionError;
88
89    /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
90    ///
91    /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
92    fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
93        // Table options include kv_metadata and col-specific options
94        let TableParquetOptions {
95            global,
96            column_specific_options,
97            key_value_metadata,
98        } = table_parquet_options;
99
100        let mut builder = global.into_writer_properties_builder()?;
101
102        // check that the arrow schema is present in the kv_metadata, if configured to do so
103        if !global.skip_arrow_metadata
104            && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
105        {
106            return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
107        }
108
109        // add kv_meta, if any
110        if !key_value_metadata.is_empty() {
111            builder = builder.set_key_value_metadata(Some(
112                key_value_metadata
113                    .to_owned()
114                    .drain()
115                    .map(|(key, value)| KeyValue { key, value })
116                    .collect(),
117            ));
118        }
119
120        // Apply column-specific options:
121        for (column, options) in column_specific_options {
122            let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
123
124            if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
125                builder = builder
126                    .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
127            }
128
129            if let Some(encoding) = &options.encoding {
130                let parsed_encoding = parse_encoding_string(encoding)?;
131                builder = builder.set_column_encoding(path.clone(), parsed_encoding);
132            }
133
134            if let Some(dictionary_enabled) = options.dictionary_enabled {
135                builder = builder
136                    .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
137            }
138
139            if let Some(compression) = &options.compression {
140                let parsed_compression = parse_compression_string(compression)?;
141                builder =
142                    builder.set_column_compression(path.clone(), parsed_compression);
143            }
144
145            if let Some(statistics_enabled) = &options.statistics_enabled {
146                let parsed_value = parse_statistics_string(statistics_enabled)?;
147                builder =
148                    builder.set_column_statistics_enabled(path.clone(), parsed_value);
149            }
150
151            if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
152                builder =
153                    builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
154            }
155
156            if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
157                builder =
158                    builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
159            }
160
161            // max_statistics_size is deprecated, currently it is not being used
162            // TODO: remove once deprecated
163            #[allow(deprecated)]
164            if let Some(max_statistics_size) = options.max_statistics_size {
165                builder = {
166                    #[allow(deprecated)]
167                    builder.set_column_max_statistics_size(path, max_statistics_size)
168                }
169            }
170        }
171
172        Ok(builder)
173    }
174}
175
176/// Encodes the Arrow schema into the IPC format, and base64 encodes it
177///
178/// TODO: use extern parquet's private method, once publicly available.
179/// Refer to <https://github.com/apache/arrow-rs/pull/6916>
180fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
181    let options = arrow_ipc::writer::IpcWriteOptions::default();
182    let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
183    let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
184    let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
185        schema,
186        &mut dictionary_tracker,
187        &options,
188    );
189
190    // manually prepending the length to the schema as arrow uses the legacy IPC format
191    // TODO: change after addressing ARROW-9777
192    let schema_len = serialized_schema.ipc_message.len();
193    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
194    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
195    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
196    len_prefix_schema.append(&mut serialized_schema.ipc_message);
197
198    base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
199}
200
201impl ParquetOptions {
202    /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
203    ///
204    /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
205    /// applied per column; a customization which is not applicable for [`ParquetOptions`].
206    ///
207    /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
208    pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
209        #[allow(deprecated)]
210        let ParquetOptions {
211            data_pagesize_limit,
212            write_batch_size,
213            writer_version,
214            compression,
215            dictionary_enabled,
216            dictionary_page_size_limit,
217            statistics_enabled,
218            max_statistics_size,
219            max_row_group_size,
220            created_by,
221            column_index_truncate_length,
222            statistics_truncate_length,
223            data_page_row_count_limit,
224            encoding,
225            bloom_filter_on_write,
226            bloom_filter_fpp,
227            bloom_filter_ndv,
228
229            // not in WriterProperties
230            enable_page_index: _,
231            pruning: _,
232            skip_metadata: _,
233            metadata_size_hint: _,
234            pushdown_filters: _,
235            reorder_filters: _,
236            allow_single_file_parallelism: _,
237            maximum_parallel_row_group_writers: _,
238            maximum_buffered_record_batches_per_stream: _,
239            bloom_filter_on_read: _, // reads not used for writer props
240            schema_force_view_types: _,
241            binary_as_string: _, // not used for writer props
242            skip_arrow_metadata: _,
243        } = self;
244
245        let mut builder = WriterProperties::builder()
246            .set_data_page_size_limit(*data_pagesize_limit)
247            .set_write_batch_size(*write_batch_size)
248            .set_writer_version(parse_version_string(writer_version.as_str())?)
249            .set_dictionary_page_size_limit(*dictionary_page_size_limit)
250            .set_statistics_enabled(
251                statistics_enabled
252                    .as_ref()
253                    .and_then(|s| parse_statistics_string(s).ok())
254                    .unwrap_or(DEFAULT_STATISTICS_ENABLED),
255            )
256            .set_max_row_group_size(*max_row_group_size)
257            .set_created_by(created_by.clone())
258            .set_column_index_truncate_length(*column_index_truncate_length)
259            .set_statistics_truncate_length(*statistics_truncate_length)
260            .set_data_page_row_count_limit(*data_page_row_count_limit)
261            .set_bloom_filter_enabled(*bloom_filter_on_write);
262
263        builder = {
264            #[allow(deprecated)]
265            builder.set_max_statistics_size(
266                max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
267            )
268        };
269
270        if let Some(bloom_filter_fpp) = bloom_filter_fpp {
271            builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
272        };
273        if let Some(bloom_filter_ndv) = bloom_filter_ndv {
274            builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
275        };
276        if let Some(dictionary_enabled) = dictionary_enabled {
277            builder = builder.set_dictionary_enabled(*dictionary_enabled);
278        };
279
280        // We do not have access to default ColumnProperties set in Arrow.
281        // Therefore, only overwrite if these settings exist.
282        if let Some(compression) = compression {
283            builder = builder.set_compression(parse_compression_string(compression)?);
284        }
285        if let Some(encoding) = encoding {
286            builder = builder.set_encoding(parse_encoding_string(encoding)?);
287        }
288
289        Ok(builder)
290    }
291}
292
293/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
294pub(crate) fn parse_encoding_string(
295    str_setting: &str,
296) -> Result<parquet::basic::Encoding> {
297    let str_setting_lower: &str = &str_setting.to_lowercase();
298    match str_setting_lower {
299        "plain" => Ok(parquet::basic::Encoding::PLAIN),
300        "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
301        "rle" => Ok(parquet::basic::Encoding::RLE),
302        #[allow(deprecated)]
303        "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
304        "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
305        "delta_length_byte_array" => {
306            Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
307        }
308        "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
309        "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
310        "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
311        _ => Err(DataFusionError::Configuration(format!(
312            "Unknown or unsupported parquet encoding: \
313        {str_setting}. Valid values are: plain, plain_dictionary, rle, \
314        bit_packed, delta_binary_packed, delta_length_byte_array, \
315        delta_byte_array, rle_dictionary, and byte_stream_split."
316        ))),
317    }
318}
319
320/// Splits compression string into compression codec and optional compression_level
321/// I.e. gzip(2) -> gzip, 2
322fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
323    // ignore string literal chars passed from sqlparser i.e. remove single quotes
324    let str_setting = str_setting.replace('\'', "");
325    let split_setting = str_setting.split_once('(');
326
327    match split_setting {
328        Some((codec, rh)) => {
329            let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
330                DataFusionError::Configuration(format!(
331                    "Could not parse compression string. \
332                    Got codec: {} and unknown level from {}",
333                    codec, str_setting
334                ))
335            })?;
336            Ok((codec.to_owned(), Some(*level)))
337        }
338        None => Ok((str_setting.to_owned(), None)),
339    }
340}
341
342/// Helper to ensure compression codecs which don't support levels
343/// don't have one set. E.g. snappy(2) is invalid.
344fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
345    if level.is_some() {
346        return Err(DataFusionError::Configuration(format!(
347            "Compression {codec} does not support specifying a level"
348        )));
349    }
350    Ok(())
351}
352
353/// Helper to ensure compression codecs which require a level
354/// do have one set. E.g. zstd is invalid, zstd(3) is valid
355fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
356    level.ok_or(DataFusionError::Configuration(format!(
357        "{codec} compression requires specifying a level such as {codec}(4)"
358    )))
359}
360
361/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
362pub fn parse_compression_string(
363    str_setting: &str,
364) -> Result<parquet::basic::Compression> {
365    let str_setting_lower: &str = &str_setting.to_lowercase();
366    let (codec, level) = split_compression_string(str_setting_lower)?;
367    let codec = codec.as_str();
368    match codec {
369        "uncompressed" => {
370            check_level_is_none(codec, &level)?;
371            Ok(parquet::basic::Compression::UNCOMPRESSED)
372        }
373        "snappy" => {
374            check_level_is_none(codec, &level)?;
375            Ok(parquet::basic::Compression::SNAPPY)
376        }
377        "gzip" => {
378            let level = require_level(codec, level)?;
379            Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
380                level,
381            )?))
382        }
383        "lzo" => {
384            check_level_is_none(codec, &level)?;
385            Ok(parquet::basic::Compression::LZO)
386        }
387        "brotli" => {
388            let level = require_level(codec, level)?;
389            Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
390                level,
391            )?))
392        }
393        "lz4" => {
394            check_level_is_none(codec, &level)?;
395            Ok(parquet::basic::Compression::LZ4)
396        }
397        "zstd" => {
398            let level = require_level(codec, level)?;
399            Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
400                level as i32,
401            )?))
402        }
403        "lz4_raw" => {
404            check_level_is_none(codec, &level)?;
405            Ok(parquet::basic::Compression::LZ4_RAW)
406        }
407        _ => Err(DataFusionError::Configuration(format!(
408            "Unknown or unsupported parquet compression: \
409        {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
410        lzo, brotli(level), lz4, zstd(level), and lz4_raw."
411        ))),
412    }
413}
414
415pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
416    let str_setting_lower: &str = &str_setting.to_lowercase();
417    match str_setting_lower {
418        "1.0" => Ok(WriterVersion::PARQUET_1_0),
419        "2.0" => Ok(WriterVersion::PARQUET_2_0),
420        _ => Err(DataFusionError::Configuration(format!(
421            "Unknown or unsupported parquet writer version {str_setting} \
422            valid options are 1.0 and 2.0"
423        ))),
424    }
425}
426
427pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
428    let str_setting_lower: &str = &str_setting.to_lowercase();
429    match str_setting_lower {
430        "none" => Ok(EnabledStatistics::None),
431        "chunk" => Ok(EnabledStatistics::Chunk),
432        "page" => Ok(EnabledStatistics::Page),
433        _ => Err(DataFusionError::Configuration(format!(
434            "Unknown or unsupported parquet statistics setting {str_setting} \
435            valid options are none, page, and chunk"
436        ))),
437    }
438}
439
440#[cfg(feature = "parquet")]
441#[cfg(test)]
442mod tests {
443    use parquet::{
444        basic::Compression,
445        file::properties::{
446            BloomFilterProperties, EnabledStatistics, DEFAULT_BLOOM_FILTER_FPP,
447            DEFAULT_BLOOM_FILTER_NDV,
448        },
449    };
450    use std::collections::HashMap;
451
452    use crate::config::{ParquetColumnOptions, ParquetOptions};
453
454    use super::*;
455
456    const COL_NAME: &str = "configured";
457
458    /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
459    fn column_options_with_non_defaults(
460        src_col_defaults: &ParquetOptions,
461    ) -> ParquetColumnOptions {
462        #[allow(deprecated)] // max_statistics_size
463        ParquetColumnOptions {
464            compression: Some("zstd(22)".into()),
465            dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
466            statistics_enabled: Some("none".into()),
467            max_statistics_size: Some(72),
468            encoding: Some("RLE".into()),
469            bloom_filter_enabled: Some(true),
470            bloom_filter_fpp: Some(0.72),
471            bloom_filter_ndv: Some(72),
472        }
473    }
474
475    fn parquet_options_with_non_defaults() -> ParquetOptions {
476        let defaults = ParquetOptions::default();
477        let writer_version = if defaults.writer_version.eq("1.0") {
478            "2.0"
479        } else {
480            "1.0"
481        };
482
483        #[allow(deprecated)] // max_statistics_size
484        ParquetOptions {
485            data_pagesize_limit: 42,
486            write_batch_size: 42,
487            writer_version: writer_version.into(),
488            compression: Some("zstd(22)".into()),
489            dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
490            dictionary_page_size_limit: 42,
491            statistics_enabled: Some("chunk".into()),
492            max_statistics_size: Some(42),
493            max_row_group_size: 42,
494            created_by: "wordy".into(),
495            column_index_truncate_length: Some(42),
496            statistics_truncate_length: Some(42),
497            data_page_row_count_limit: 42,
498            encoding: Some("BYTE_STREAM_SPLIT".into()),
499            bloom_filter_on_write: !defaults.bloom_filter_on_write,
500            bloom_filter_fpp: Some(0.42),
501            bloom_filter_ndv: Some(42),
502
503            // not in WriterProperties, but itemizing here to not skip newly added props
504            enable_page_index: defaults.enable_page_index,
505            pruning: defaults.pruning,
506            skip_metadata: defaults.skip_metadata,
507            metadata_size_hint: defaults.metadata_size_hint,
508            pushdown_filters: defaults.pushdown_filters,
509            reorder_filters: defaults.reorder_filters,
510            allow_single_file_parallelism: defaults.allow_single_file_parallelism,
511            maximum_parallel_row_group_writers: defaults
512                .maximum_parallel_row_group_writers,
513            maximum_buffered_record_batches_per_stream: defaults
514                .maximum_buffered_record_batches_per_stream,
515            bloom_filter_on_read: defaults.bloom_filter_on_read,
516            schema_force_view_types: defaults.schema_force_view_types,
517            binary_as_string: defaults.binary_as_string,
518            skip_arrow_metadata: defaults.skip_arrow_metadata,
519        }
520    }
521
522    fn extract_column_options(
523        props: &WriterProperties,
524        col: ColumnPath,
525    ) -> ParquetColumnOptions {
526        let bloom_filter_default_props = props.bloom_filter_properties(&col);
527
528        #[allow(deprecated)] // max_statistics_size
529        ParquetColumnOptions {
530            bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
531            encoding: props.encoding(&col).map(|s| s.to_string()),
532            dictionary_enabled: Some(props.dictionary_enabled(&col)),
533            compression: match props.compression(&col) {
534                Compression::ZSTD(lvl) => {
535                    Some(format!("zstd({})", lvl.compression_level()))
536                }
537                _ => None,
538            },
539            statistics_enabled: Some(
540                match props.statistics_enabled(&col) {
541                    EnabledStatistics::None => "none",
542                    EnabledStatistics::Chunk => "chunk",
543                    EnabledStatistics::Page => "page",
544                }
545                .into(),
546            ),
547            bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
548            bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
549            max_statistics_size: Some(props.max_statistics_size(&col)),
550        }
551    }
552
553    /// For testing only, take a single write's props and convert back into the session config.
554    /// (use identity to confirm correct.)
555    fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
556        let default_col = ColumnPath::from("col doesn't have specific config");
557        let default_col_props = extract_column_options(props, default_col);
558
559        let configured_col = ColumnPath::from(COL_NAME);
560        let configured_col_props = extract_column_options(props, configured_col);
561
562        let key_value_metadata = props
563            .key_value_metadata()
564            .map(|pairs| {
565                HashMap::from_iter(
566                    pairs
567                        .iter()
568                        .cloned()
569                        .map(|KeyValue { key, value }| (key, value)),
570                )
571            })
572            .unwrap_or_default();
573
574        let global_options_defaults = ParquetOptions::default();
575
576        let column_specific_options = if configured_col_props.eq(&default_col_props) {
577            HashMap::default()
578        } else {
579            HashMap::from([(COL_NAME.into(), configured_col_props)])
580        };
581
582        #[allow(deprecated)] // max_statistics_size
583        TableParquetOptions {
584            global: ParquetOptions {
585                // global options
586                data_pagesize_limit: props.dictionary_page_size_limit(),
587                write_batch_size: props.write_batch_size(),
588                writer_version: format!("{}.0", props.writer_version().as_num()),
589                dictionary_page_size_limit: props.dictionary_page_size_limit(),
590                max_row_group_size: props.max_row_group_size(),
591                created_by: props.created_by().to_string(),
592                column_index_truncate_length: props.column_index_truncate_length(),
593                statistics_truncate_length: props.statistics_truncate_length(),
594                data_page_row_count_limit: props.data_page_row_count_limit(),
595
596                // global options which set the default column props
597                encoding: default_col_props.encoding,
598                compression: default_col_props.compression,
599                dictionary_enabled: default_col_props.dictionary_enabled,
600                statistics_enabled: default_col_props.statistics_enabled,
601                max_statistics_size: default_col_props.max_statistics_size,
602                bloom_filter_on_write: default_col_props
603                    .bloom_filter_enabled
604                    .unwrap_or_default(),
605                bloom_filter_fpp: default_col_props.bloom_filter_fpp,
606                bloom_filter_ndv: default_col_props.bloom_filter_ndv,
607
608                // not in WriterProperties
609                enable_page_index: global_options_defaults.enable_page_index,
610                pruning: global_options_defaults.pruning,
611                skip_metadata: global_options_defaults.skip_metadata,
612                metadata_size_hint: global_options_defaults.metadata_size_hint,
613                pushdown_filters: global_options_defaults.pushdown_filters,
614                reorder_filters: global_options_defaults.reorder_filters,
615                allow_single_file_parallelism: global_options_defaults
616                    .allow_single_file_parallelism,
617                maximum_parallel_row_group_writers: global_options_defaults
618                    .maximum_parallel_row_group_writers,
619                maximum_buffered_record_batches_per_stream: global_options_defaults
620                    .maximum_buffered_record_batches_per_stream,
621                bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
622                schema_force_view_types: global_options_defaults.schema_force_view_types,
623                binary_as_string: global_options_defaults.binary_as_string,
624                skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
625            },
626            column_specific_options,
627            key_value_metadata,
628        }
629    }
630
631    #[test]
632    fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
633        // TableParquetOptions, all props set to default
634        let mut table_parquet_opts = TableParquetOptions::default();
635        assert!(
636            !table_parquet_opts.global.skip_arrow_metadata,
637            "default false, to not skip the arrow schema requirement"
638        );
639
640        // see errors without the schema added, using default settings
641        let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
642        assert!(
643            should_error.is_err(),
644            "should error without the required arrow schema in kv_metadata",
645        );
646
647        // succeeds if we permit skipping the arrow schema
648        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
649        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
650        assert!(
651            should_succeed.is_ok(),
652            "should work with the arrow schema skipped by config",
653        );
654
655        // Set the arrow schema back to required
656        table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
657        // add the arrow schema to the kv_meta
658        table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
659        let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
660        assert!(
661            should_succeed.is_ok(),
662            "should work with the arrow schema included in TableParquetOptions",
663        );
664    }
665
666    #[test]
667    fn table_parquet_opts_to_writer_props() {
668        // ParquetOptions, all props set to non-default
669        let parquet_options = parquet_options_with_non_defaults();
670
671        // TableParquetOptions, using ParquetOptions for global settings
672        let key = ARROW_SCHEMA_META_KEY.to_string();
673        let value = Some("bar".into());
674        let table_parquet_opts = TableParquetOptions {
675            global: parquet_options.clone(),
676            column_specific_options: [(
677                COL_NAME.into(),
678                column_options_with_non_defaults(&parquet_options),
679            )]
680            .into(),
681            key_value_metadata: [(key, value)].into(),
682        };
683
684        let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
685            .unwrap()
686            .build();
687        assert_eq!(
688            table_parquet_opts,
689            session_config_from_writer_props(&writer_props),
690            "the writer_props should have the same configuration as the session's TableParquetOptions",
691        );
692    }
693
694    /// Ensure that the configuration defaults for writing parquet files are
695    /// consistent with the options in arrow-rs
696    #[test]
697    fn test_defaults_match() {
698        // ensure the global settings are the same
699        let mut default_table_writer_opts = TableParquetOptions::default();
700        let default_parquet_opts = ParquetOptions::default();
701        assert_eq!(
702            default_table_writer_opts.global,
703            default_parquet_opts,
704            "should have matching defaults for TableParquetOptions.global and ParquetOptions",
705        );
706
707        // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
708        default_table_writer_opts =
709            default_table_writer_opts.with_skip_arrow_metadata(true);
710
711        // WriterProperties::default, a.k.a. using extern parquet's defaults
712        let default_writer_props = WriterProperties::new();
713
714        // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
715        let from_datafusion_defaults =
716            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
717                .unwrap()
718                .build();
719
720        // Expected: how the defaults should not match
721        assert_ne!(
722            default_writer_props.created_by(),
723            from_datafusion_defaults.created_by(),
724            "should have different created_by sources",
725        );
726        assert!(
727            default_writer_props.created_by().starts_with("parquet-rs version"),
728            "should indicate that writer_props defaults came from the extern parquet crate",
729        );
730        assert!(
731            default_table_writer_opts
732                .global
733                .created_by
734                .starts_with("datafusion version"),
735            "should indicate that table_parquet_opts defaults came from datafusion",
736        );
737
738        // Expected: the datafusion default compression is different from arrow-rs's parquet
739        assert_eq!(
740            default_writer_props.compression(&"default".into()),
741            Compression::UNCOMPRESSED,
742            "extern parquet's default is None"
743        );
744        assert!(
745            matches!(
746                from_datafusion_defaults.compression(&"default".into()),
747                Compression::ZSTD(_)
748            ),
749            "datafusion's default is zstd"
750        );
751
752        // Expected: the remaining should match
753        let same_created_by = default_table_writer_opts.global.created_by.clone();
754        let mut from_extern_parquet =
755            session_config_from_writer_props(&default_writer_props);
756        from_extern_parquet.global.created_by = same_created_by;
757        from_extern_parquet.global.compression = Some("zstd(3)".into());
758        from_extern_parquet.global.skip_arrow_metadata = true;
759
760        assert_eq!(
761            default_table_writer_opts,
762            from_extern_parquet,
763            "the default writer_props should have the same configuration as the session's default TableParquetOptions",
764        );
765    }
766
767    #[test]
768    fn test_bloom_filter_defaults() {
769        // the TableParquetOptions::default, with only the bloom filter turned on
770        let mut default_table_writer_opts = TableParquetOptions::default();
771        default_table_writer_opts.global.bloom_filter_on_write = true;
772        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
773        let from_datafusion_defaults =
774            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
775                .unwrap()
776                .build();
777
778        // the WriterProperties::default, with only the bloom filter turned on
779        let default_writer_props = WriterProperties::builder()
780            .set_bloom_filter_enabled(true)
781            .build();
782
783        assert_eq!(
784            default_writer_props.bloom_filter_properties(&"default".into()),
785            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
786            "parquet and datafusion props, should have the same bloom filter props",
787        );
788        assert_eq!(
789            default_writer_props.bloom_filter_properties(&"default".into()),
790            Some(&BloomFilterProperties::default()),
791            "should use the default bloom filter props"
792        );
793    }
794
795    #[test]
796    fn test_bloom_filter_set_fpp_only() {
797        // the TableParquetOptions::default, with only fpp set
798        let mut default_table_writer_opts = TableParquetOptions::default();
799        default_table_writer_opts.global.bloom_filter_on_write = true;
800        default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
801        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
802        let from_datafusion_defaults =
803            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
804                .unwrap()
805                .build();
806
807        // the WriterProperties::default, with only fpp set
808        let default_writer_props = WriterProperties::builder()
809            .set_bloom_filter_enabled(true)
810            .set_bloom_filter_fpp(0.42)
811            .build();
812
813        assert_eq!(
814            default_writer_props.bloom_filter_properties(&"default".into()),
815            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
816            "parquet and datafusion props, should have the same bloom filter props",
817        );
818        assert_eq!(
819            default_writer_props.bloom_filter_properties(&"default".into()),
820            Some(&BloomFilterProperties {
821                fpp: 0.42,
822                ndv: DEFAULT_BLOOM_FILTER_NDV
823            }),
824            "should have only the fpp set, and the ndv at default",
825        );
826    }
827
828    #[test]
829    fn test_bloom_filter_set_ndv_only() {
830        // the TableParquetOptions::default, with only ndv set
831        let mut default_table_writer_opts = TableParquetOptions::default();
832        default_table_writer_opts.global.bloom_filter_on_write = true;
833        default_table_writer_opts.global.bloom_filter_ndv = Some(42);
834        default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
835        let from_datafusion_defaults =
836            WriterPropertiesBuilder::try_from(&default_table_writer_opts)
837                .unwrap()
838                .build();
839
840        // the WriterProperties::default, with only ndv set
841        let default_writer_props = WriterProperties::builder()
842            .set_bloom_filter_enabled(true)
843            .set_bloom_filter_ndv(42)
844            .build();
845
846        assert_eq!(
847            default_writer_props.bloom_filter_properties(&"default".into()),
848            from_datafusion_defaults.bloom_filter_properties(&"default".into()),
849            "parquet and datafusion props, should have the same bloom filter props",
850        );
851        assert_eq!(
852            default_writer_props.bloom_filter_properties(&"default".into()),
853            Some(&BloomFilterProperties {
854                fpp: DEFAULT_BLOOM_FILTER_FPP,
855                ndv: 42
856            }),
857            "should have only the ndv set, and the fpp at default",
858        );
859    }
860}