1use base64::Engine;
21use std::sync::Arc;
22
23use crate::{
24 config::{ParquetOptions, TableParquetOptions},
25 DataFusionError, Result, _internal_datafusion_err,
26};
27
28use arrow::datatypes::Schema;
29#[allow(deprecated)]
31use parquet::{
32 arrow::ARROW_SCHEMA_META_KEY,
33 basic::{BrotliLevel, GzipLevel, ZstdLevel},
34 file::{
35 metadata::KeyValue,
36 properties::{
37 EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
38 DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
39 },
40 },
41 schema::types::ColumnPath,
42};
43
44#[derive(Clone, Debug)]
46pub struct ParquetWriterOptions {
47 pub writer_options: WriterProperties,
49}
50
51impl ParquetWriterOptions {
52 pub fn new(writer_options: WriterProperties) -> Self {
53 Self { writer_options }
54 }
55}
56
57impl ParquetWriterOptions {
58 pub fn writer_options(&self) -> &WriterProperties {
59 &self.writer_options
60 }
61}
62
63impl TableParquetOptions {
64 pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
67 self.key_value_metadata.insert(
68 ARROW_SCHEMA_META_KEY.into(),
69 Some(encode_arrow_schema(schema)),
70 );
71 }
72}
73
74impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
75 type Error = DataFusionError;
76
77 fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
78 Ok(ParquetWriterOptions {
80 writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
81 .build(),
82 })
83 }
84}
85
86impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
87 type Error = DataFusionError;
88
89 fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
93 let TableParquetOptions {
95 global,
96 column_specific_options,
97 key_value_metadata,
98 } = table_parquet_options;
99
100 let mut builder = global.into_writer_properties_builder()?;
101
102 if !global.skip_arrow_metadata
104 && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
105 {
106 return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
107 }
108
109 if !key_value_metadata.is_empty() {
111 builder = builder.set_key_value_metadata(Some(
112 key_value_metadata
113 .to_owned()
114 .drain()
115 .map(|(key, value)| KeyValue { key, value })
116 .collect(),
117 ));
118 }
119
120 for (column, options) in column_specific_options {
122 let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());
123
124 if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
125 builder = builder
126 .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
127 }
128
129 if let Some(encoding) = &options.encoding {
130 let parsed_encoding = parse_encoding_string(encoding)?;
131 builder = builder.set_column_encoding(path.clone(), parsed_encoding);
132 }
133
134 if let Some(dictionary_enabled) = options.dictionary_enabled {
135 builder = builder
136 .set_column_dictionary_enabled(path.clone(), dictionary_enabled);
137 }
138
139 if let Some(compression) = &options.compression {
140 let parsed_compression = parse_compression_string(compression)?;
141 builder =
142 builder.set_column_compression(path.clone(), parsed_compression);
143 }
144
145 if let Some(statistics_enabled) = &options.statistics_enabled {
146 let parsed_value = parse_statistics_string(statistics_enabled)?;
147 builder =
148 builder.set_column_statistics_enabled(path.clone(), parsed_value);
149 }
150
151 if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
152 builder =
153 builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
154 }
155
156 if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
157 builder =
158 builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
159 }
160
161 #[allow(deprecated)]
164 if let Some(max_statistics_size) = options.max_statistics_size {
165 builder = {
166 #[allow(deprecated)]
167 builder.set_column_max_statistics_size(path, max_statistics_size)
168 }
169 }
170 }
171
172 Ok(builder)
173 }
174}
175
176fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
181 let options = arrow_ipc::writer::IpcWriteOptions::default();
182 let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
183 let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
184 let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
185 schema,
186 &mut dictionary_tracker,
187 &options,
188 );
189
190 let schema_len = serialized_schema.ipc_message.len();
193 let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
194 len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
195 len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
196 len_prefix_schema.append(&mut serialized_schema.ipc_message);
197
198 base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
199}
200
201impl ParquetOptions {
202 pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
209 #[allow(deprecated)]
210 let ParquetOptions {
211 data_pagesize_limit,
212 write_batch_size,
213 writer_version,
214 compression,
215 dictionary_enabled,
216 dictionary_page_size_limit,
217 statistics_enabled,
218 max_statistics_size,
219 max_row_group_size,
220 created_by,
221 column_index_truncate_length,
222 statistics_truncate_length,
223 data_page_row_count_limit,
224 encoding,
225 bloom_filter_on_write,
226 bloom_filter_fpp,
227 bloom_filter_ndv,
228
229 enable_page_index: _,
231 pruning: _,
232 skip_metadata: _,
233 metadata_size_hint: _,
234 pushdown_filters: _,
235 reorder_filters: _,
236 allow_single_file_parallelism: _,
237 maximum_parallel_row_group_writers: _,
238 maximum_buffered_record_batches_per_stream: _,
239 bloom_filter_on_read: _, schema_force_view_types: _,
241 binary_as_string: _, skip_arrow_metadata: _,
243 } = self;
244
245 let mut builder = WriterProperties::builder()
246 .set_data_page_size_limit(*data_pagesize_limit)
247 .set_write_batch_size(*write_batch_size)
248 .set_writer_version(parse_version_string(writer_version.as_str())?)
249 .set_dictionary_page_size_limit(*dictionary_page_size_limit)
250 .set_statistics_enabled(
251 statistics_enabled
252 .as_ref()
253 .and_then(|s| parse_statistics_string(s).ok())
254 .unwrap_or(DEFAULT_STATISTICS_ENABLED),
255 )
256 .set_max_row_group_size(*max_row_group_size)
257 .set_created_by(created_by.clone())
258 .set_column_index_truncate_length(*column_index_truncate_length)
259 .set_statistics_truncate_length(*statistics_truncate_length)
260 .set_data_page_row_count_limit(*data_page_row_count_limit)
261 .set_bloom_filter_enabled(*bloom_filter_on_write);
262
263 builder = {
264 #[allow(deprecated)]
265 builder.set_max_statistics_size(
266 max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
267 )
268 };
269
270 if let Some(bloom_filter_fpp) = bloom_filter_fpp {
271 builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
272 };
273 if let Some(bloom_filter_ndv) = bloom_filter_ndv {
274 builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
275 };
276 if let Some(dictionary_enabled) = dictionary_enabled {
277 builder = builder.set_dictionary_enabled(*dictionary_enabled);
278 };
279
280 if let Some(compression) = compression {
283 builder = builder.set_compression(parse_compression_string(compression)?);
284 }
285 if let Some(encoding) = encoding {
286 builder = builder.set_encoding(parse_encoding_string(encoding)?);
287 }
288
289 Ok(builder)
290 }
291}
292
293pub(crate) fn parse_encoding_string(
295 str_setting: &str,
296) -> Result<parquet::basic::Encoding> {
297 let str_setting_lower: &str = &str_setting.to_lowercase();
298 match str_setting_lower {
299 "plain" => Ok(parquet::basic::Encoding::PLAIN),
300 "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
301 "rle" => Ok(parquet::basic::Encoding::RLE),
302 #[allow(deprecated)]
303 "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
304 "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
305 "delta_length_byte_array" => {
306 Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
307 }
308 "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
309 "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
310 "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
311 _ => Err(DataFusionError::Configuration(format!(
312 "Unknown or unsupported parquet encoding: \
313 {str_setting}. Valid values are: plain, plain_dictionary, rle, \
314 bit_packed, delta_binary_packed, delta_length_byte_array, \
315 delta_byte_array, rle_dictionary, and byte_stream_split."
316 ))),
317 }
318}
319
320fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
323 let str_setting = str_setting.replace('\'', "");
325 let split_setting = str_setting.split_once('(');
326
327 match split_setting {
328 Some((codec, rh)) => {
329 let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
330 DataFusionError::Configuration(format!(
331 "Could not parse compression string. \
332 Got codec: {} and unknown level from {}",
333 codec, str_setting
334 ))
335 })?;
336 Ok((codec.to_owned(), Some(*level)))
337 }
338 None => Ok((str_setting.to_owned(), None)),
339 }
340}
341
342fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
345 if level.is_some() {
346 return Err(DataFusionError::Configuration(format!(
347 "Compression {codec} does not support specifying a level"
348 )));
349 }
350 Ok(())
351}
352
353fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
356 level.ok_or(DataFusionError::Configuration(format!(
357 "{codec} compression requires specifying a level such as {codec}(4)"
358 )))
359}
360
361pub fn parse_compression_string(
363 str_setting: &str,
364) -> Result<parquet::basic::Compression> {
365 let str_setting_lower: &str = &str_setting.to_lowercase();
366 let (codec, level) = split_compression_string(str_setting_lower)?;
367 let codec = codec.as_str();
368 match codec {
369 "uncompressed" => {
370 check_level_is_none(codec, &level)?;
371 Ok(parquet::basic::Compression::UNCOMPRESSED)
372 }
373 "snappy" => {
374 check_level_is_none(codec, &level)?;
375 Ok(parquet::basic::Compression::SNAPPY)
376 }
377 "gzip" => {
378 let level = require_level(codec, level)?;
379 Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
380 level,
381 )?))
382 }
383 "lzo" => {
384 check_level_is_none(codec, &level)?;
385 Ok(parquet::basic::Compression::LZO)
386 }
387 "brotli" => {
388 let level = require_level(codec, level)?;
389 Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
390 level,
391 )?))
392 }
393 "lz4" => {
394 check_level_is_none(codec, &level)?;
395 Ok(parquet::basic::Compression::LZ4)
396 }
397 "zstd" => {
398 let level = require_level(codec, level)?;
399 Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
400 level as i32,
401 )?))
402 }
403 "lz4_raw" => {
404 check_level_is_none(codec, &level)?;
405 Ok(parquet::basic::Compression::LZ4_RAW)
406 }
407 _ => Err(DataFusionError::Configuration(format!(
408 "Unknown or unsupported parquet compression: \
409 {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
410 lzo, brotli(level), lz4, zstd(level), and lz4_raw."
411 ))),
412 }
413}
414
415pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
416 let str_setting_lower: &str = &str_setting.to_lowercase();
417 match str_setting_lower {
418 "1.0" => Ok(WriterVersion::PARQUET_1_0),
419 "2.0" => Ok(WriterVersion::PARQUET_2_0),
420 _ => Err(DataFusionError::Configuration(format!(
421 "Unknown or unsupported parquet writer version {str_setting} \
422 valid options are 1.0 and 2.0"
423 ))),
424 }
425}
426
427pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
428 let str_setting_lower: &str = &str_setting.to_lowercase();
429 match str_setting_lower {
430 "none" => Ok(EnabledStatistics::None),
431 "chunk" => Ok(EnabledStatistics::Chunk),
432 "page" => Ok(EnabledStatistics::Page),
433 _ => Err(DataFusionError::Configuration(format!(
434 "Unknown or unsupported parquet statistics setting {str_setting} \
435 valid options are none, page, and chunk"
436 ))),
437 }
438}
439
440#[cfg(feature = "parquet")]
441#[cfg(test)]
442mod tests {
443 use parquet::{
444 basic::Compression,
445 file::properties::{
446 BloomFilterProperties, EnabledStatistics, DEFAULT_BLOOM_FILTER_FPP,
447 DEFAULT_BLOOM_FILTER_NDV,
448 },
449 };
450 use std::collections::HashMap;
451
452 use crate::config::{ParquetColumnOptions, ParquetOptions};
453
454 use super::*;
455
456 const COL_NAME: &str = "configured";
457
458 fn column_options_with_non_defaults(
460 src_col_defaults: &ParquetOptions,
461 ) -> ParquetColumnOptions {
462 #[allow(deprecated)] ParquetColumnOptions {
464 compression: Some("zstd(22)".into()),
465 dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
466 statistics_enabled: Some("none".into()),
467 max_statistics_size: Some(72),
468 encoding: Some("RLE".into()),
469 bloom_filter_enabled: Some(true),
470 bloom_filter_fpp: Some(0.72),
471 bloom_filter_ndv: Some(72),
472 }
473 }
474
475 fn parquet_options_with_non_defaults() -> ParquetOptions {
476 let defaults = ParquetOptions::default();
477 let writer_version = if defaults.writer_version.eq("1.0") {
478 "2.0"
479 } else {
480 "1.0"
481 };
482
483 #[allow(deprecated)] ParquetOptions {
485 data_pagesize_limit: 42,
486 write_batch_size: 42,
487 writer_version: writer_version.into(),
488 compression: Some("zstd(22)".into()),
489 dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
490 dictionary_page_size_limit: 42,
491 statistics_enabled: Some("chunk".into()),
492 max_statistics_size: Some(42),
493 max_row_group_size: 42,
494 created_by: "wordy".into(),
495 column_index_truncate_length: Some(42),
496 statistics_truncate_length: Some(42),
497 data_page_row_count_limit: 42,
498 encoding: Some("BYTE_STREAM_SPLIT".into()),
499 bloom_filter_on_write: !defaults.bloom_filter_on_write,
500 bloom_filter_fpp: Some(0.42),
501 bloom_filter_ndv: Some(42),
502
503 enable_page_index: defaults.enable_page_index,
505 pruning: defaults.pruning,
506 skip_metadata: defaults.skip_metadata,
507 metadata_size_hint: defaults.metadata_size_hint,
508 pushdown_filters: defaults.pushdown_filters,
509 reorder_filters: defaults.reorder_filters,
510 allow_single_file_parallelism: defaults.allow_single_file_parallelism,
511 maximum_parallel_row_group_writers: defaults
512 .maximum_parallel_row_group_writers,
513 maximum_buffered_record_batches_per_stream: defaults
514 .maximum_buffered_record_batches_per_stream,
515 bloom_filter_on_read: defaults.bloom_filter_on_read,
516 schema_force_view_types: defaults.schema_force_view_types,
517 binary_as_string: defaults.binary_as_string,
518 skip_arrow_metadata: defaults.skip_arrow_metadata,
519 }
520 }
521
522 fn extract_column_options(
523 props: &WriterProperties,
524 col: ColumnPath,
525 ) -> ParquetColumnOptions {
526 let bloom_filter_default_props = props.bloom_filter_properties(&col);
527
528 #[allow(deprecated)] ParquetColumnOptions {
530 bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
531 encoding: props.encoding(&col).map(|s| s.to_string()),
532 dictionary_enabled: Some(props.dictionary_enabled(&col)),
533 compression: match props.compression(&col) {
534 Compression::ZSTD(lvl) => {
535 Some(format!("zstd({})", lvl.compression_level()))
536 }
537 _ => None,
538 },
539 statistics_enabled: Some(
540 match props.statistics_enabled(&col) {
541 EnabledStatistics::None => "none",
542 EnabledStatistics::Chunk => "chunk",
543 EnabledStatistics::Page => "page",
544 }
545 .into(),
546 ),
547 bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
548 bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
549 max_statistics_size: Some(props.max_statistics_size(&col)),
550 }
551 }
552
553 fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions {
556 let default_col = ColumnPath::from("col doesn't have specific config");
557 let default_col_props = extract_column_options(props, default_col);
558
559 let configured_col = ColumnPath::from(COL_NAME);
560 let configured_col_props = extract_column_options(props, configured_col);
561
562 let key_value_metadata = props
563 .key_value_metadata()
564 .map(|pairs| {
565 HashMap::from_iter(
566 pairs
567 .iter()
568 .cloned()
569 .map(|KeyValue { key, value }| (key, value)),
570 )
571 })
572 .unwrap_or_default();
573
574 let global_options_defaults = ParquetOptions::default();
575
576 let column_specific_options = if configured_col_props.eq(&default_col_props) {
577 HashMap::default()
578 } else {
579 HashMap::from([(COL_NAME.into(), configured_col_props)])
580 };
581
582 #[allow(deprecated)] TableParquetOptions {
584 global: ParquetOptions {
585 data_pagesize_limit: props.dictionary_page_size_limit(),
587 write_batch_size: props.write_batch_size(),
588 writer_version: format!("{}.0", props.writer_version().as_num()),
589 dictionary_page_size_limit: props.dictionary_page_size_limit(),
590 max_row_group_size: props.max_row_group_size(),
591 created_by: props.created_by().to_string(),
592 column_index_truncate_length: props.column_index_truncate_length(),
593 statistics_truncate_length: props.statistics_truncate_length(),
594 data_page_row_count_limit: props.data_page_row_count_limit(),
595
596 encoding: default_col_props.encoding,
598 compression: default_col_props.compression,
599 dictionary_enabled: default_col_props.dictionary_enabled,
600 statistics_enabled: default_col_props.statistics_enabled,
601 max_statistics_size: default_col_props.max_statistics_size,
602 bloom_filter_on_write: default_col_props
603 .bloom_filter_enabled
604 .unwrap_or_default(),
605 bloom_filter_fpp: default_col_props.bloom_filter_fpp,
606 bloom_filter_ndv: default_col_props.bloom_filter_ndv,
607
608 enable_page_index: global_options_defaults.enable_page_index,
610 pruning: global_options_defaults.pruning,
611 skip_metadata: global_options_defaults.skip_metadata,
612 metadata_size_hint: global_options_defaults.metadata_size_hint,
613 pushdown_filters: global_options_defaults.pushdown_filters,
614 reorder_filters: global_options_defaults.reorder_filters,
615 allow_single_file_parallelism: global_options_defaults
616 .allow_single_file_parallelism,
617 maximum_parallel_row_group_writers: global_options_defaults
618 .maximum_parallel_row_group_writers,
619 maximum_buffered_record_batches_per_stream: global_options_defaults
620 .maximum_buffered_record_batches_per_stream,
621 bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
622 schema_force_view_types: global_options_defaults.schema_force_view_types,
623 binary_as_string: global_options_defaults.binary_as_string,
624 skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
625 },
626 column_specific_options,
627 key_value_metadata,
628 }
629 }
630
631 #[test]
632 fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
633 let mut table_parquet_opts = TableParquetOptions::default();
635 assert!(
636 !table_parquet_opts.global.skip_arrow_metadata,
637 "default false, to not skip the arrow schema requirement"
638 );
639
640 let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
642 assert!(
643 should_error.is_err(),
644 "should error without the required arrow schema in kv_metadata",
645 );
646
647 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
649 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
650 assert!(
651 should_succeed.is_ok(),
652 "should work with the arrow schema skipped by config",
653 );
654
655 table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
657 table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
659 let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
660 assert!(
661 should_succeed.is_ok(),
662 "should work with the arrow schema included in TableParquetOptions",
663 );
664 }
665
666 #[test]
667 fn table_parquet_opts_to_writer_props() {
668 let parquet_options = parquet_options_with_non_defaults();
670
671 let key = ARROW_SCHEMA_META_KEY.to_string();
673 let value = Some("bar".into());
674 let table_parquet_opts = TableParquetOptions {
675 global: parquet_options.clone(),
676 column_specific_options: [(
677 COL_NAME.into(),
678 column_options_with_non_defaults(&parquet_options),
679 )]
680 .into(),
681 key_value_metadata: [(key, value)].into(),
682 };
683
684 let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
685 .unwrap()
686 .build();
687 assert_eq!(
688 table_parquet_opts,
689 session_config_from_writer_props(&writer_props),
690 "the writer_props should have the same configuration as the session's TableParquetOptions",
691 );
692 }
693
694 #[test]
697 fn test_defaults_match() {
698 let mut default_table_writer_opts = TableParquetOptions::default();
700 let default_parquet_opts = ParquetOptions::default();
701 assert_eq!(
702 default_table_writer_opts.global,
703 default_parquet_opts,
704 "should have matching defaults for TableParquetOptions.global and ParquetOptions",
705 );
706
707 default_table_writer_opts =
709 default_table_writer_opts.with_skip_arrow_metadata(true);
710
711 let default_writer_props = WriterProperties::new();
713
714 let from_datafusion_defaults =
716 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
717 .unwrap()
718 .build();
719
720 assert_ne!(
722 default_writer_props.created_by(),
723 from_datafusion_defaults.created_by(),
724 "should have different created_by sources",
725 );
726 assert!(
727 default_writer_props.created_by().starts_with("parquet-rs version"),
728 "should indicate that writer_props defaults came from the extern parquet crate",
729 );
730 assert!(
731 default_table_writer_opts
732 .global
733 .created_by
734 .starts_with("datafusion version"),
735 "should indicate that table_parquet_opts defaults came from datafusion",
736 );
737
738 assert_eq!(
740 default_writer_props.compression(&"default".into()),
741 Compression::UNCOMPRESSED,
742 "extern parquet's default is None"
743 );
744 assert!(
745 matches!(
746 from_datafusion_defaults.compression(&"default".into()),
747 Compression::ZSTD(_)
748 ),
749 "datafusion's default is zstd"
750 );
751
752 let same_created_by = default_table_writer_opts.global.created_by.clone();
754 let mut from_extern_parquet =
755 session_config_from_writer_props(&default_writer_props);
756 from_extern_parquet.global.created_by = same_created_by;
757 from_extern_parquet.global.compression = Some("zstd(3)".into());
758 from_extern_parquet.global.skip_arrow_metadata = true;
759
760 assert_eq!(
761 default_table_writer_opts,
762 from_extern_parquet,
763 "the default writer_props should have the same configuration as the session's default TableParquetOptions",
764 );
765 }
766
767 #[test]
768 fn test_bloom_filter_defaults() {
769 let mut default_table_writer_opts = TableParquetOptions::default();
771 default_table_writer_opts.global.bloom_filter_on_write = true;
772 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
774 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
775 .unwrap()
776 .build();
777
778 let default_writer_props = WriterProperties::builder()
780 .set_bloom_filter_enabled(true)
781 .build();
782
783 assert_eq!(
784 default_writer_props.bloom_filter_properties(&"default".into()),
785 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
786 "parquet and datafusion props, should have the same bloom filter props",
787 );
788 assert_eq!(
789 default_writer_props.bloom_filter_properties(&"default".into()),
790 Some(&BloomFilterProperties::default()),
791 "should use the default bloom filter props"
792 );
793 }
794
795 #[test]
796 fn test_bloom_filter_set_fpp_only() {
797 let mut default_table_writer_opts = TableParquetOptions::default();
799 default_table_writer_opts.global.bloom_filter_on_write = true;
800 default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
801 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
803 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
804 .unwrap()
805 .build();
806
807 let default_writer_props = WriterProperties::builder()
809 .set_bloom_filter_enabled(true)
810 .set_bloom_filter_fpp(0.42)
811 .build();
812
813 assert_eq!(
814 default_writer_props.bloom_filter_properties(&"default".into()),
815 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
816 "parquet and datafusion props, should have the same bloom filter props",
817 );
818 assert_eq!(
819 default_writer_props.bloom_filter_properties(&"default".into()),
820 Some(&BloomFilterProperties {
821 fpp: 0.42,
822 ndv: DEFAULT_BLOOM_FILTER_NDV
823 }),
824 "should have only the fpp set, and the ndv at default",
825 );
826 }
827
828 #[test]
829 fn test_bloom_filter_set_ndv_only() {
830 let mut default_table_writer_opts = TableParquetOptions::default();
832 default_table_writer_opts.global.bloom_filter_on_write = true;
833 default_table_writer_opts.global.bloom_filter_ndv = Some(42);
834 default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); let from_datafusion_defaults =
836 WriterPropertiesBuilder::try_from(&default_table_writer_opts)
837 .unwrap()
838 .build();
839
840 let default_writer_props = WriterProperties::builder()
842 .set_bloom_filter_enabled(true)
843 .set_bloom_filter_ndv(42)
844 .build();
845
846 assert_eq!(
847 default_writer_props.bloom_filter_properties(&"default".into()),
848 from_datafusion_defaults.bloom_filter_properties(&"default".into()),
849 "parquet and datafusion props, should have the same bloom filter props",
850 );
851 assert_eq!(
852 default_writer_props.bloom_filter_properties(&"default".into()),
853 Some(&BloomFilterProperties {
854 fpp: DEFAULT_BLOOM_FILTER_FPP,
855 ndv: 42
856 }),
857 "should have only the ndv set, and the fpp at default",
858 );
859 }
860}