datafusion_common/file_options/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Options related to how files should be written
19
20pub mod arrow_writer;
21pub mod avro_writer;
22pub mod csv_writer;
23pub mod file_type;
24pub mod json_writer;
25#[cfg(feature = "parquet")]
26pub mod parquet_writer;
27
28#[cfg(test)]
29#[cfg(feature = "parquet")]
30mod tests {
31    use std::collections::HashMap;
32
33    use crate::{
34        config::{ConfigFileType, TableOptions},
35        file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
36        parsers::CompressionTypeVariant,
37        Result,
38    };
39
40    use parquet::{
41        basic::{Compression, Encoding, ZstdLevel},
42        file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion},
43        schema::types::ColumnPath,
44    };
45
46    #[test]
47    fn test_writeroptions_parquet_from_statement_options() -> Result<()> {
48        let mut option_map: HashMap<String, String> = HashMap::new();
49        option_map.insert("format.max_row_group_size".to_owned(), "123".to_owned());
50        option_map.insert("format.data_pagesize_limit".to_owned(), "123".to_owned());
51        option_map.insert("format.write_batch_size".to_owned(), "123".to_owned());
52        option_map.insert("format.writer_version".to_owned(), "2.0".to_owned());
53        option_map.insert(
54            "format.dictionary_page_size_limit".to_owned(),
55            "123".to_owned(),
56        );
57        option_map.insert(
58            "format.created_by".to_owned(),
59            "df write unit test".to_owned(),
60        );
61        option_map.insert(
62            "format.column_index_truncate_length".to_owned(),
63            "123".to_owned(),
64        );
65        option_map.insert(
66            "format.data_page_row_count_limit".to_owned(),
67            "123".to_owned(),
68        );
69        option_map.insert("format.bloom_filter_on_write".to_owned(), "true".to_owned());
70        option_map.insert("format.encoding".to_owned(), "plain".to_owned());
71        option_map.insert("format.dictionary_enabled".to_owned(), "true".to_owned());
72        option_map.insert("format.compression".to_owned(), "zstd(4)".to_owned());
73        option_map.insert("format.statistics_enabled".to_owned(), "page".to_owned());
74        option_map.insert("format.bloom_filter_fpp".to_owned(), "0.123".to_owned());
75        option_map.insert("format.bloom_filter_ndv".to_owned(), "123".to_owned());
76
77        let mut table_config = TableOptions::new();
78        table_config.set_config_format(ConfigFileType::PARQUET);
79        table_config.alter_with_string_hash_map(&option_map)?;
80
81        let properties = WriterPropertiesBuilder::try_from(
82            &table_config.parquet.with_skip_arrow_metadata(true),
83        )?
84        .build();
85
86        // Verify the expected options propagated down to parquet crate WriterProperties struct
87        assert_eq!(properties.max_row_group_size(), 123);
88        assert_eq!(properties.data_page_size_limit(), 123);
89        assert_eq!(properties.write_batch_size(), 123);
90        assert_eq!(properties.writer_version(), WriterVersion::PARQUET_2_0);
91        assert_eq!(properties.dictionary_page_size_limit(), 123);
92        assert_eq!(properties.created_by(), "df write unit test");
93        assert_eq!(properties.column_index_truncate_length(), Some(123));
94        assert_eq!(properties.data_page_row_count_limit(), 123);
95        properties
96            .bloom_filter_properties(&ColumnPath::from(""))
97            .expect("expected bloom filter enabled");
98        assert_eq!(
99            properties
100                .encoding(&ColumnPath::from(""))
101                .expect("expected default encoding"),
102            Encoding::PLAIN
103        );
104        assert!(properties.dictionary_enabled(&ColumnPath::from("")));
105        assert_eq!(
106            properties.compression(&ColumnPath::from("")),
107            Compression::ZSTD(ZstdLevel::try_new(4_i32)?)
108        );
109        assert_eq!(
110            properties.statistics_enabled(&ColumnPath::from("")),
111            EnabledStatistics::Page
112        );
113        assert_eq!(
114            properties
115                .bloom_filter_properties(&ColumnPath::from(""))
116                .expect("expected bloom properties!")
117                .fpp,
118            0.123
119        );
120        assert_eq!(
121            properties
122                .bloom_filter_properties(&ColumnPath::from(""))
123                .expect("expected bloom properties!")
124                .ndv,
125            123
126        );
127
128        // properties which remain as default on WriterProperties
129        assert_eq!(properties.key_value_metadata(), None);
130        assert_eq!(properties.sorting_columns(), None);
131
132        Ok(())
133    }
134
135    #[test]
136    fn test_writeroptions_parquet_column_specific() -> Result<()> {
137        let mut option_map: HashMap<String, String> = HashMap::new();
138
139        option_map.insert(
140            "format.bloom_filter_enabled::col1".to_owned(),
141            "true".to_owned(),
142        );
143        option_map.insert(
144            "format.bloom_filter_enabled::col2.nested".to_owned(),
145            "true".to_owned(),
146        );
147        option_map.insert("format.encoding::col1".to_owned(), "plain".to_owned());
148        option_map.insert("format.encoding::col2.nested".to_owned(), "rle".to_owned());
149        option_map.insert(
150            "format.dictionary_enabled::col1".to_owned(),
151            "true".to_owned(),
152        );
153        option_map.insert(
154            "format.dictionary_enabled::col2.nested".to_owned(),
155            "true".to_owned(),
156        );
157        option_map.insert("format.compression::col1".to_owned(), "zstd(4)".to_owned());
158        option_map.insert(
159            "format.compression::col2.nested".to_owned(),
160            "zstd(10)".to_owned(),
161        );
162        option_map.insert(
163            "format.statistics_enabled::col1".to_owned(),
164            "page".to_owned(),
165        );
166        option_map.insert(
167            "format.statistics_enabled::col2.nested".to_owned(),
168            "none".to_owned(),
169        );
170        option_map.insert(
171            "format.bloom_filter_fpp::col1".to_owned(),
172            "0.123".to_owned(),
173        );
174        option_map.insert(
175            "format.bloom_filter_fpp::col2.nested".to_owned(),
176            "0.456".to_owned(),
177        );
178        option_map.insert("format.bloom_filter_ndv::col1".to_owned(), "123".to_owned());
179        option_map.insert(
180            "format.bloom_filter_ndv::col2.nested".to_owned(),
181            "456".to_owned(),
182        );
183
184        let mut table_config = TableOptions::new();
185        table_config.set_config_format(ConfigFileType::PARQUET);
186        table_config.alter_with_string_hash_map(&option_map)?;
187
188        let properties = WriterPropertiesBuilder::try_from(
189            &table_config.parquet.with_skip_arrow_metadata(true),
190        )?
191        .build();
192
193        let col1 = ColumnPath::from(vec!["col1".to_owned()]);
194        let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);
195
196        // Verify the expected options propagated down to parquet crate WriterProperties struct
197
198        properties
199            .bloom_filter_properties(&col1)
200            .expect("expected bloom filter enabled for col1");
201
202        properties
203            .bloom_filter_properties(&col2_nested)
204            .expect("expected bloom filter enabled cor col2_nested");
205
206        assert_eq!(
207            properties.encoding(&col1).expect("expected encoding"),
208            Encoding::PLAIN
209        );
210
211        assert_eq!(
212            properties
213                .encoding(&col2_nested)
214                .expect("expected encoding"),
215            Encoding::RLE
216        );
217
218        assert!(properties.dictionary_enabled(&col1));
219        assert!(properties.dictionary_enabled(&col2_nested));
220
221        assert_eq!(
222            properties.compression(&col1),
223            Compression::ZSTD(ZstdLevel::try_new(4_i32)?)
224        );
225
226        assert_eq!(
227            properties.compression(&col2_nested),
228            Compression::ZSTD(ZstdLevel::try_new(10_i32)?)
229        );
230
231        assert_eq!(
232            properties.statistics_enabled(&col1),
233            EnabledStatistics::Page
234        );
235
236        assert_eq!(
237            properties.statistics_enabled(&col2_nested),
238            EnabledStatistics::None
239        );
240
241        assert_eq!(
242            properties
243                .bloom_filter_properties(&col1)
244                .expect("expected bloom properties!")
245                .fpp,
246            0.123
247        );
248
249        assert_eq!(
250            properties
251                .bloom_filter_properties(&col2_nested)
252                .expect("expected bloom properties!")
253                .fpp,
254            0.456
255        );
256
257        assert_eq!(
258            properties
259                .bloom_filter_properties(&col1)
260                .expect("expected bloom properties!")
261                .ndv,
262            123
263        );
264
265        assert_eq!(
266            properties
267                .bloom_filter_properties(&col2_nested)
268                .expect("expected bloom properties!")
269                .ndv,
270            456
271        );
272
273        Ok(())
274    }
275
276    #[test]
277    // for StatementOptions
278    fn test_writeroptions_csv_from_statement_options() -> Result<()> {
279        let mut option_map: HashMap<String, String> = HashMap::new();
280        option_map.insert("format.has_header".to_owned(), "true".to_owned());
281        option_map.insert("format.date_format".to_owned(), "123".to_owned());
282        option_map.insert("format.datetime_format".to_owned(), "123".to_owned());
283        option_map.insert("format.timestamp_format".to_owned(), "2.0".to_owned());
284        option_map.insert("format.time_format".to_owned(), "123".to_owned());
285        option_map.insert("format.null_value".to_owned(), "123".to_owned());
286        option_map.insert("format.compression".to_owned(), "gzip".to_owned());
287        option_map.insert("format.delimiter".to_owned(), ";".to_owned());
288
289        let mut table_config = TableOptions::new();
290        table_config.set_config_format(ConfigFileType::CSV);
291        table_config.alter_with_string_hash_map(&option_map)?;
292
293        let csv_options = CsvWriterOptions::try_from(&table_config.csv)?;
294
295        let builder = csv_options.writer_options;
296        assert!(builder.header());
297        let buff = Vec::new();
298        let _properties = builder.build(buff);
299        assert_eq!(csv_options.compression, CompressionTypeVariant::GZIP);
300        // TODO expand unit test if csv::WriterBuilder allows public read access to properties
301
302        Ok(())
303    }
304
305    #[test]
306    // for StatementOptions
307    fn test_writeroptions_json_from_statement_options() -> Result<()> {
308        let mut option_map: HashMap<String, String> = HashMap::new();
309        option_map.insert("format.compression".to_owned(), "gzip".to_owned());
310
311        let mut table_config = TableOptions::new();
312        table_config.set_config_format(ConfigFileType::JSON);
313        table_config.alter_with_string_hash_map(&option_map)?;
314
315        let json_options = JsonWriterOptions::try_from(&table_config.json)?;
316        assert_eq!(json_options.compression, CompressionTypeVariant::GZIP);
317
318        Ok(())
319    }
320}