arrow_json/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! # JSON Writer
19//!
20//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
21//! JSON objects or JSON formatted byte streams.
22//!
23//! ## Writing JSON formatted byte streams
24//!
25//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
26//! [`LineDelimitedWriter`]:
27//!
28//! ```
29//! # use std::sync::Arc;
30//! # use arrow_array::{Int32Array, RecordBatch};
31//! # use arrow_schema::{DataType, Field, Schema};
32//!
33//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
34//! let a = Int32Array::from(vec![1, 2, 3]);
35//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
36//!
37//! // Write the record batch out as JSON
38//! let buf = Vec::new();
39//! let mut writer = arrow_json::LineDelimitedWriter::new(buf);
40//! writer.write_batches(&vec![&batch]).unwrap();
41//! writer.finish().unwrap();
42//!
43//! // Get the underlying buffer back,
44//! let buf = writer.into_inner();
45//! assert_eq!(r#"{"a":1}
46//! {"a":2}
47//! {"a":3}
48//!"#, String::from_utf8(buf).unwrap())
49//! ```
50//!
51//! To serialize [`RecordBatch`]es into a well formed JSON array, use
52//! [`ArrayWriter`]:
53//!
54//! ```
55//! # use std::sync::Arc;
56//! # use arrow_array::{Int32Array, RecordBatch};
57//! use arrow_schema::{DataType, Field, Schema};
58//!
59//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
60//! let a = Int32Array::from(vec![1, 2, 3]);
61//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
62//!
63//! // Write the record batch out as a JSON array
64//! let buf = Vec::new();
65//! let mut writer = arrow_json::ArrayWriter::new(buf);
66//! writer.write_batches(&vec![&batch]).unwrap();
67//! writer.finish().unwrap();
68//!
69//! // Get the underlying buffer back,
70//! let buf = writer.into_inner();
71//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap())
72//! ```
73//!
74//! [`LineDelimitedWriter`] and [`ArrayWriter`] will omit writing keys with null values.
75//! In order to explicitly write null values for keys, configure a custom [`Writer`] by
76//! using a [`WriterBuilder`] to construct a [`Writer`].
77//!
78//! ## Writing to [serde_json] JSON Objects
79//!
80//! To serialize [`RecordBatch`]es into an array of
81//! [JSON](https://docs.serde.rs/serde_json/) objects you can reparse the resulting JSON string.
82//! Note that this is less efficient than using the `Writer` API.
83//!
84//! ```
85//! # use std::sync::Arc;
86//! # use arrow_array::{Int32Array, RecordBatch};
87//! # use arrow_schema::{DataType, Field, Schema};
88//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
89//! let a = Int32Array::from(vec![1, 2, 3]);
90//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
91//!
92//! // Write the record batch out as json bytes (string)
93//! let buf = Vec::new();
94//! let mut writer = arrow_json::ArrayWriter::new(buf);
95//! writer.write_batches(&vec![&batch]).unwrap();
96//! writer.finish().unwrap();
97//! let json_data = writer.into_inner();
98//!
99//! // Parse the string using serde_json
100//! use serde_json::{Map, Value};
101//! let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(json_data.as_slice()).unwrap();
102//! assert_eq!(
103//!     serde_json::Value::Object(json_rows[1].clone()),
104//!     serde_json::json!({"a": 2}),
105//! );
106//! ```
107mod encoder;
108
109use std::{fmt::Debug, io::Write};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115use encoder::{make_encoder, EncoderOptions};
116
117/// This trait defines how to format a sequence of JSON objects to a
118/// byte stream.
119pub trait JsonFormat: Debug + Default {
120    #[inline]
121    /// write any bytes needed at the start of the file to the writer
122    fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123        Ok(())
124    }
125
126    #[inline]
127    /// write any bytes needed for the start of each row
128    fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129        Ok(())
130    }
131
132    #[inline]
133    /// write any bytes needed for the end of each row
134    fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135        Ok(())
136    }
137
138    /// write any bytes needed for the start of each row
139    fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140        Ok(())
141    }
142}
143
144/// Produces JSON output with one record per line.
145///
146/// For example:
147///
148/// ```json
149/// {"foo":1}
150/// {"bar":1}
151///
152/// ```
153#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157    fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158        writer.write_all(b"\n")?;
159        Ok(())
160    }
161}
162
163/// Produces JSON output as a single JSON array.
164///
165/// For example:
166///
167/// ```json
168/// [{"foo":1},{"bar":1}]
169/// ```
170#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174    fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175        writer.write_all(b"[")?;
176        Ok(())
177    }
178
179    fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180        if !is_first_row {
181            writer.write_all(b",")?;
182        }
183        Ok(())
184    }
185
186    fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187        writer.write_all(b"]")?;
188        Ok(())
189    }
190}
191
192/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects.
193pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays.
196pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198/// JSON writer builder.
199#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203    /// Create a new builder for configuring JSON writing options.
204    ///
205    /// # Example
206    ///
207    /// ```
208    /// # use arrow_json::{Writer, WriterBuilder};
209    /// # use arrow_json::writer::LineDelimited;
210    /// # use std::fs::File;
211    ///
212    /// fn example() -> Writer<File, LineDelimited> {
213    ///     let file = File::create("target/out.json").unwrap();
214    ///
215    ///     // create a builder that keeps keys with null values
216    ///     let builder = WriterBuilder::new().with_explicit_nulls(true);
217    ///     let writer = builder.build::<_, LineDelimited>(file);
218    ///
219    ///     writer
220    /// }
221    /// ```
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    /// Returns `true` if this writer is configured to keep keys with null values.
227    pub fn explicit_nulls(&self) -> bool {
228        self.0.explicit_nulls
229    }
230
231    /// Set whether to keep keys with null values, or to omit writing them.
232    ///
233    /// For example, with [`LineDelimited`] format:
234    ///
235    /// Skip nulls (set to `false`):
236    ///
237    /// ```json
238    /// {"foo":1}
239    /// {"foo":1,"bar":2}
240    /// {}
241    /// ```
242    ///
243    /// Keep nulls (set to `true`):
244    ///
245    /// ```json
246    /// {"foo":1,"bar":null}
247    /// {"foo":1,"bar":2}
248    /// {"foo":null,"bar":null}
249    /// ```
250    ///
251    /// Default is to skip nulls (set to `false`). If `struct_mode == ListOnly`,
252    /// nulls will be written explicitly regardless of this setting.
253    pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254        self.0.explicit_nulls = explicit_nulls;
255        self
256    }
257
258    /// Returns if this writer is configured to write structs as JSON Objects or Arrays.
259    pub fn struct_mode(&self) -> StructMode {
260        self.0.struct_mode
261    }
262
263    /// Set the [`StructMode`] for the writer, which determines whether structs
264    /// are encoded to JSON as objects or lists. For more details refer to the
265    /// enum documentation. Default is to use `ObjectOnly`. If this is set to
266    /// `ListOnly`, nulls will be written explicitly regardless of the
267    /// `explicit_nulls` setting.
268    pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269        self.0.struct_mode = struct_mode;
270        self
271    }
272
273    /// Create a new `Writer` with specified `JsonFormat` and builder options.
274    pub fn build<W, F>(self, writer: W) -> Writer<W, F>
275    where
276        W: Write,
277        F: JsonFormat,
278    {
279        Writer {
280            writer,
281            started: false,
282            finished: false,
283            format: F::default(),
284            options: self.0,
285        }
286    }
287}
288
289/// A JSON writer which serializes [`RecordBatch`]es to a stream of
290/// `u8` encoded JSON objects.
291///
292/// See the module level documentation for detailed usage and examples.
293/// The specific format of the stream is controlled by the [`JsonFormat`]
294/// type parameter.
295///
296/// By default the writer will skip writing keys with null values for
297/// backward compatibility. See [`WriterBuilder`] on how to customize
298/// this behaviour when creating a new writer.
299#[derive(Debug)]
300pub struct Writer<W, F>
301where
302    W: Write,
303    F: JsonFormat,
304{
305    /// Underlying writer to use to write bytes
306    writer: W,
307
308    /// Has the writer output any records yet?
309    started: bool,
310
311    /// Is the writer finished?
312    finished: bool,
313
314    /// Determines how the byte stream is formatted
315    format: F,
316
317    /// Controls how JSON should be encoded, e.g. whether to write explicit nulls or skip them
318    options: EncoderOptions,
319}
320
321impl<W, F> Writer<W, F>
322where
323    W: Write,
324    F: JsonFormat,
325{
326    /// Construct a new writer
327    pub fn new(writer: W) -> Self {
328        Self {
329            writer,
330            started: false,
331            finished: false,
332            format: F::default(),
333            options: EncoderOptions::default(),
334        }
335    }
336
337    /// Serialize `batch` to JSON output
338    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
339        if batch.num_rows() == 0 {
340            return Ok(());
341        }
342
343        // BufWriter uses a buffer size of 8KB
344        // We therefore double this and flush once we have more than 8KB
345        let mut buffer = Vec::with_capacity(16 * 1024);
346
347        let mut is_first_row = !self.started;
348        if !self.started {
349            self.format.start_stream(&mut buffer)?;
350            self.started = true;
351        }
352
353        let array = StructArray::from(batch.clone());
354        let mut encoder = make_encoder(&array, &self.options)?;
355
356        for idx in 0..batch.num_rows() {
357            self.format.start_row(&mut buffer, is_first_row)?;
358            is_first_row = false;
359
360            encoder.encode(idx, &mut buffer);
361            if buffer.len() > 8 * 1024 {
362                self.writer.write_all(&buffer)?;
363                buffer.clear();
364            }
365            self.format.end_row(&mut buffer)?;
366        }
367
368        if !buffer.is_empty() {
369            self.writer.write_all(&buffer)?;
370        }
371
372        Ok(())
373    }
374
375    /// Serialize `batches` to JSON output
376    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
377        for b in batches {
378            self.write(b)?;
379        }
380        Ok(())
381    }
382
383    /// Finishes the output stream. This function must be called after
384    /// all record batches have been produced. (e.g. producing the final `']'` if writing
385    /// arrays.
386    pub fn finish(&mut self) -> Result<(), ArrowError> {
387        if !self.started {
388            self.format.start_stream(&mut self.writer)?;
389            self.started = true;
390        }
391        if !self.finished {
392            self.format.end_stream(&mut self.writer)?;
393            self.finished = true;
394        }
395
396        Ok(())
397    }
398
399    /// Unwraps this `Writer<W>`, returning the underlying writer
400    pub fn into_inner(self) -> W {
401        self.writer
402    }
403}
404
405impl<W, F> RecordBatchWriter for Writer<W, F>
406where
407    W: Write,
408    F: JsonFormat,
409{
410    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
411        self.write(batch)
412    }
413
414    fn close(mut self) -> Result<(), ArrowError> {
415        self.finish()
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use core::str;
422    use std::fs::{read_to_string, File};
423    use std::io::{BufReader, Seek};
424    use std::sync::Arc;
425
426    use serde_json::{json, Value};
427
428    use arrow_array::builder::*;
429    use arrow_array::types::*;
430    use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ToByteSlice};
431    use arrow_data::ArrayData;
432
433    use crate::reader::*;
434
435    use super::*;
436
437    /// Asserts that the NDJSON `input` is semantically identical to `expected`
438    fn assert_json_eq(input: &[u8], expected: &str) {
439        let expected: Vec<Option<Value>> = expected
440            .split('\n')
441            .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
442            .collect();
443
444        let actual: Vec<Option<Value>> = input
445            .split(|b| *b == b'\n')
446            .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
447            .collect();
448
449        assert_eq!(expected, actual);
450    }
451
452    #[test]
453    fn write_simple_rows() {
454        let schema = Schema::new(vec![
455            Field::new("c1", DataType::Int32, true),
456            Field::new("c2", DataType::Utf8, true),
457        ]);
458
459        let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
460        let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
461
462        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
463
464        let mut buf = Vec::new();
465        {
466            let mut writer = LineDelimitedWriter::new(&mut buf);
467            writer.write_batches(&[&batch]).unwrap();
468        }
469
470        assert_json_eq(
471            &buf,
472            r#"{"c1":1,"c2":"a"}
473{"c1":2,"c2":"b"}
474{"c1":3,"c2":"c"}
475{"c2":"d"}
476{"c1":5}
477"#,
478        );
479    }
480
481    #[test]
482    fn write_large_utf8_and_utf8_view() {
483        let schema = Schema::new(vec![
484            Field::new("c1", DataType::Utf8, true),
485            Field::new("c2", DataType::LargeUtf8, true),
486            Field::new("c3", DataType::Utf8View, true),
487        ]);
488
489        let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
490        let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
491        let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
492
493        let batch = RecordBatch::try_new(
494            Arc::new(schema),
495            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
496        )
497        .unwrap();
498
499        let mut buf = Vec::new();
500        {
501            let mut writer = LineDelimitedWriter::new(&mut buf);
502            writer.write_batches(&[&batch]).unwrap();
503        }
504
505        assert_json_eq(
506            &buf,
507            r#"{"c1":"a","c2":"a","c3":"a"}
508{"c2":"b","c3":"b"}
509{"c1":"c"}
510{"c1":"d","c2":"d","c3":"d"}
511{}
512"#,
513        );
514    }
515
516    #[test]
517    fn write_dictionary() {
518        let schema = Schema::new(vec![
519            Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
520            Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
521        ]);
522
523        let a: DictionaryArray<Int32Type> = vec![
524            Some("cupcakes"),
525            Some("foo"),
526            Some("foo"),
527            None,
528            Some("cupcakes"),
529        ]
530        .into_iter()
531        .collect();
532        let b: DictionaryArray<Int8Type> =
533            vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
534                .into_iter()
535                .collect();
536
537        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
538
539        let mut buf = Vec::new();
540        {
541            let mut writer = LineDelimitedWriter::new(&mut buf);
542            writer.write_batches(&[&batch]).unwrap();
543        }
544
545        assert_json_eq(
546            &buf,
547            r#"{"c1":"cupcakes","c2":"sdsd"}
548{"c1":"foo","c2":"sdsd"}
549{"c1":"foo"}
550{"c2":"sd"}
551{"c1":"cupcakes","c2":"sdsd"}
552"#,
553        );
554    }
555
556    #[test]
557    fn write_list_of_dictionary() {
558        let dict_field = Arc::new(Field::new_dictionary(
559            "item",
560            DataType::Int32,
561            DataType::Utf8,
562            true,
563        ));
564        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
565
566        let dict_array: DictionaryArray<Int32Type> =
567            vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
568                .into_iter()
569                .collect();
570        let list_array = LargeListArray::try_new(
571            dict_field,
572            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
573            Arc::new(dict_array),
574            Some(NullBuffer::from_iter([true, true, false, true])),
575        )
576        .unwrap();
577
578        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
579
580        let mut buf = Vec::new();
581        {
582            let mut writer = LineDelimitedWriter::new(&mut buf);
583            writer.write_batches(&[&batch]).unwrap();
584        }
585
586        assert_json_eq(
587            &buf,
588            r#"{"l":["a","b","c"]}
589{"l":["a",null]}
590{}
591{"l":["c"]}
592"#,
593        );
594    }
595
596    #[test]
597    fn write_list_of_dictionary_large_values() {
598        let dict_field = Arc::new(Field::new_dictionary(
599            "item",
600            DataType::Int32,
601            DataType::LargeUtf8,
602            true,
603        ));
604        let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
605
606        let keys = PrimitiveArray::<Int32Type>::from(vec![
607            Some(0),
608            Some(1),
609            Some(2),
610            Some(0),
611            None,
612            Some(2),
613        ]);
614        let values = LargeStringArray::from(vec!["a", "b", "c"]);
615        let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
616
617        let list_array = LargeListArray::try_new(
618            dict_field,
619            OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
620            Arc::new(dict_array),
621            Some(NullBuffer::from_iter([true, true, false, true])),
622        )
623        .unwrap();
624
625        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
626
627        let mut buf = Vec::new();
628        {
629            let mut writer = LineDelimitedWriter::new(&mut buf);
630            writer.write_batches(&[&batch]).unwrap();
631        }
632
633        assert_json_eq(
634            &buf,
635            r#"{"l":["a","b","c"]}
636{"l":["a",null]}
637{}
638{"l":["c"]}
639"#,
640        );
641    }
642
643    #[test]
644    fn write_timestamps() {
645        let ts_string = "2018-11-13T17:11:10.011375885995";
646        let ts_nanos = ts_string
647            .parse::<chrono::NaiveDateTime>()
648            .unwrap()
649            .and_utc()
650            .timestamp_nanos_opt()
651            .unwrap();
652        let ts_micros = ts_nanos / 1000;
653        let ts_millis = ts_micros / 1000;
654        let ts_secs = ts_millis / 1000;
655
656        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
657        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
658        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
659        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
660        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
661
662        let schema = Schema::new(vec![
663            Field::new("nanos", arr_nanos.data_type().clone(), true),
664            Field::new("micros", arr_micros.data_type().clone(), true),
665            Field::new("millis", arr_millis.data_type().clone(), true),
666            Field::new("secs", arr_secs.data_type().clone(), true),
667            Field::new("name", arr_names.data_type().clone(), true),
668        ]);
669        let schema = Arc::new(schema);
670
671        let batch = RecordBatch::try_new(
672            schema,
673            vec![
674                Arc::new(arr_nanos),
675                Arc::new(arr_micros),
676                Arc::new(arr_millis),
677                Arc::new(arr_secs),
678                Arc::new(arr_names),
679            ],
680        )
681        .unwrap();
682
683        let mut buf = Vec::new();
684        {
685            let mut writer = LineDelimitedWriter::new(&mut buf);
686            writer.write_batches(&[&batch]).unwrap();
687        }
688
689        assert_json_eq(
690            &buf,
691            r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
692{"name":"b"}
693"#,
694        );
695    }
696
697    #[test]
698    fn write_timestamps_with_tz() {
699        let ts_string = "2018-11-13T17:11:10.011375885995";
700        let ts_nanos = ts_string
701            .parse::<chrono::NaiveDateTime>()
702            .unwrap()
703            .and_utc()
704            .timestamp_nanos_opt()
705            .unwrap();
706        let ts_micros = ts_nanos / 1000;
707        let ts_millis = ts_micros / 1000;
708        let ts_secs = ts_millis / 1000;
709
710        let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
711        let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
712        let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
713        let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
714        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
715
716        let tz = "+00:00";
717
718        let arr_nanos = arr_nanos.with_timezone(tz);
719        let arr_micros = arr_micros.with_timezone(tz);
720        let arr_millis = arr_millis.with_timezone(tz);
721        let arr_secs = arr_secs.with_timezone(tz);
722
723        let schema = Schema::new(vec![
724            Field::new("nanos", arr_nanos.data_type().clone(), true),
725            Field::new("micros", arr_micros.data_type().clone(), true),
726            Field::new("millis", arr_millis.data_type().clone(), true),
727            Field::new("secs", arr_secs.data_type().clone(), true),
728            Field::new("name", arr_names.data_type().clone(), true),
729        ]);
730        let schema = Arc::new(schema);
731
732        let batch = RecordBatch::try_new(
733            schema,
734            vec![
735                Arc::new(arr_nanos),
736                Arc::new(arr_micros),
737                Arc::new(arr_millis),
738                Arc::new(arr_secs),
739                Arc::new(arr_names),
740            ],
741        )
742        .unwrap();
743
744        let mut buf = Vec::new();
745        {
746            let mut writer = LineDelimitedWriter::new(&mut buf);
747            writer.write_batches(&[&batch]).unwrap();
748        }
749
750        assert_json_eq(
751            &buf,
752            r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
753{"name":"b"}
754"#,
755        );
756    }
757
758    #[test]
759    fn write_dates() {
760        let ts_string = "2018-11-13T17:11:10.011375885995";
761        let ts_millis = ts_string
762            .parse::<chrono::NaiveDateTime>()
763            .unwrap()
764            .and_utc()
765            .timestamp_millis();
766
767        let arr_date32 = Date32Array::from(vec![
768            Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
769            None,
770        ]);
771        let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
772        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
773
774        let schema = Schema::new(vec![
775            Field::new("date32", arr_date32.data_type().clone(), true),
776            Field::new("date64", arr_date64.data_type().clone(), true),
777            Field::new("name", arr_names.data_type().clone(), false),
778        ]);
779        let schema = Arc::new(schema);
780
781        let batch = RecordBatch::try_new(
782            schema,
783            vec![
784                Arc::new(arr_date32),
785                Arc::new(arr_date64),
786                Arc::new(arr_names),
787            ],
788        )
789        .unwrap();
790
791        let mut buf = Vec::new();
792        {
793            let mut writer = LineDelimitedWriter::new(&mut buf);
794            writer.write_batches(&[&batch]).unwrap();
795        }
796
797        assert_json_eq(
798            &buf,
799            r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
800{"name":"b"}
801"#,
802        );
803    }
804
805    #[test]
806    fn write_times() {
807        let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
808        let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
809        let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
810        let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
811        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
812
813        let schema = Schema::new(vec![
814            Field::new("time32sec", arr_time32sec.data_type().clone(), true),
815            Field::new("time32msec", arr_time32msec.data_type().clone(), true),
816            Field::new("time64usec", arr_time64usec.data_type().clone(), true),
817            Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
818            Field::new("name", arr_names.data_type().clone(), true),
819        ]);
820        let schema = Arc::new(schema);
821
822        let batch = RecordBatch::try_new(
823            schema,
824            vec![
825                Arc::new(arr_time32sec),
826                Arc::new(arr_time32msec),
827                Arc::new(arr_time64usec),
828                Arc::new(arr_time64nsec),
829                Arc::new(arr_names),
830            ],
831        )
832        .unwrap();
833
834        let mut buf = Vec::new();
835        {
836            let mut writer = LineDelimitedWriter::new(&mut buf);
837            writer.write_batches(&[&batch]).unwrap();
838        }
839
840        assert_json_eq(
841            &buf,
842            r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
843{"name":"b"}
844"#,
845        );
846    }
847
848    #[test]
849    fn write_durations() {
850        let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
851        let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
852        let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
853        let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
854        let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
855
856        let schema = Schema::new(vec![
857            Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
858            Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
859            Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
860            Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
861            Field::new("name", arr_names.data_type().clone(), true),
862        ]);
863        let schema = Arc::new(schema);
864
865        let batch = RecordBatch::try_new(
866            schema,
867            vec![
868                Arc::new(arr_durationsec),
869                Arc::new(arr_durationmsec),
870                Arc::new(arr_durationusec),
871                Arc::new(arr_durationnsec),
872                Arc::new(arr_names),
873            ],
874        )
875        .unwrap();
876
877        let mut buf = Vec::new();
878        {
879            let mut writer = LineDelimitedWriter::new(&mut buf);
880            writer.write_batches(&[&batch]).unwrap();
881        }
882
883        assert_json_eq(
884            &buf,
885            r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
886{"name":"b"}
887"#,
888        );
889    }
890
891    #[test]
892    fn write_nested_structs() {
893        let schema = Schema::new(vec![
894            Field::new(
895                "c1",
896                DataType::Struct(Fields::from(vec![
897                    Field::new("c11", DataType::Int32, true),
898                    Field::new(
899                        "c12",
900                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
901                        false,
902                    ),
903                ])),
904                false,
905            ),
906            Field::new("c2", DataType::Utf8, false),
907        ]);
908
909        let c1 = StructArray::from(vec![
910            (
911                Arc::new(Field::new("c11", DataType::Int32, true)),
912                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
913            ),
914            (
915                Arc::new(Field::new(
916                    "c12",
917                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
918                    false,
919                )),
920                Arc::new(StructArray::from(vec![(
921                    Arc::new(Field::new("c121", DataType::Utf8, false)),
922                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
923                )])) as ArrayRef,
924            ),
925        ]);
926        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
927
928        let batch =
929            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
930
931        let mut buf = Vec::new();
932        {
933            let mut writer = LineDelimitedWriter::new(&mut buf);
934            writer.write_batches(&[&batch]).unwrap();
935        }
936
937        assert_json_eq(
938            &buf,
939            r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
940{"c1":{"c12":{"c121":"f"}},"c2":"b"}
941{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
942"#,
943        );
944    }
945
946    #[test]
947    fn write_struct_with_list_field() {
948        let field_c1 = Field::new(
949            "c1",
950            DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
951            false,
952        );
953        let field_c2 = Field::new("c2", DataType::Int32, false);
954        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
955
956        let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
957        // list column rows: ["a", "a1"], ["b"], ["c"], ["d"], ["e"]
958        let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
959        let a_list_data = ArrayData::builder(field_c1.data_type().clone())
960            .len(5)
961            .add_buffer(a_value_offsets)
962            .add_child_data(a_values.into_data())
963            .null_bit_buffer(Some(Buffer::from([0b00011111])))
964            .build()
965            .unwrap();
966        let a = ListArray::from(a_list_data);
967
968        let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
969
970        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
971
972        let mut buf = Vec::new();
973        {
974            let mut writer = LineDelimitedWriter::new(&mut buf);
975            writer.write_batches(&[&batch]).unwrap();
976        }
977
978        assert_json_eq(
979            &buf,
980            r#"{"c1":["a","a1"],"c2":1}
981{"c1":["b"],"c2":2}
982{"c1":["c"],"c2":3}
983{"c1":["d"],"c2":4}
984{"c1":["e"],"c2":5}
985"#,
986        );
987    }
988
989    #[test]
990    fn write_nested_list() {
991        let list_inner_type = Field::new(
992            "a",
993            DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
994            false,
995        );
996        let field_c1 = Field::new(
997            "c1",
998            DataType::List(Arc::new(list_inner_type.clone())),
999            false,
1000        );
1001        let field_c2 = Field::new("c2", DataType::Utf8, true);
1002        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1003
1004        // list column rows: [[1, 2], [3]], [], [[4, 5, 6]]
1005        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1006
1007        let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
1008        // Construct a list array from the above two
1009        let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
1010            .len(3)
1011            .add_buffer(a_value_offsets)
1012            .null_bit_buffer(Some(Buffer::from([0b00000111])))
1013            .add_child_data(a_values.into_data())
1014            .build()
1015            .unwrap();
1016
1017        let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1018        let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1019            .len(3)
1020            .add_buffer(c1_value_offsets)
1021            .add_child_data(a_list_data)
1022            .build()
1023            .unwrap();
1024
1025        let c1 = ListArray::from(c1_list_data);
1026        let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1027
1028        let batch =
1029            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1030
1031        let mut buf = Vec::new();
1032        {
1033            let mut writer = LineDelimitedWriter::new(&mut buf);
1034            writer.write_batches(&[&batch]).unwrap();
1035        }
1036
1037        assert_json_eq(
1038            &buf,
1039            r#"{"c1":[[1,2],[3]],"c2":"foo"}
1040{"c1":[],"c2":"bar"}
1041{"c1":[[4,5,6]]}
1042"#,
1043        );
1044    }
1045
1046    #[test]
1047    fn write_list_of_struct() {
1048        let field_c1 = Field::new(
1049            "c1",
1050            DataType::List(Arc::new(Field::new(
1051                "s",
1052                DataType::Struct(Fields::from(vec![
1053                    Field::new("c11", DataType::Int32, true),
1054                    Field::new(
1055                        "c12",
1056                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1057                        false,
1058                    ),
1059                ])),
1060                false,
1061            ))),
1062            true,
1063        );
1064        let field_c2 = Field::new("c2", DataType::Int32, false);
1065        let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1066
1067        let struct_values = StructArray::from(vec![
1068            (
1069                Arc::new(Field::new("c11", DataType::Int32, true)),
1070                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1071            ),
1072            (
1073                Arc::new(Field::new(
1074                    "c12",
1075                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1076                    false,
1077                )),
1078                Arc::new(StructArray::from(vec![(
1079                    Arc::new(Field::new("c121", DataType::Utf8, false)),
1080                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1081                )])) as ArrayRef,
1082            ),
1083        ]);
1084
1085        // list column rows (c1):
1086        // [{"c11": 1, "c12": {"c121": "e"}}, {"c12": {"c121": "f"}}],
1087        // null,
1088        // [{"c11": 5, "c12": {"c121": "g"}}]
1089        let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1090        let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1091            .len(3)
1092            .add_buffer(c1_value_offsets)
1093            .add_child_data(struct_values.into_data())
1094            .null_bit_buffer(Some(Buffer::from([0b00000101])))
1095            .build()
1096            .unwrap();
1097        let c1 = ListArray::from(c1_list_data);
1098
1099        let c2 = Int32Array::from(vec![1, 2, 3]);
1100
1101        let batch =
1102            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1103
1104        let mut buf = Vec::new();
1105        {
1106            let mut writer = LineDelimitedWriter::new(&mut buf);
1107            writer.write_batches(&[&batch]).unwrap();
1108        }
1109
1110        assert_json_eq(
1111            &buf,
1112            r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1113{"c2":2}
1114{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1115"#,
1116        );
1117    }
1118
1119    fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1120        let file = File::open(test_file).unwrap();
1121        let mut reader = BufReader::new(file);
1122        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1123        reader.rewind().unwrap();
1124
1125        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1126        let mut reader = builder.build(reader).unwrap();
1127        let batch = reader.next().unwrap().unwrap();
1128
1129        let mut buf = Vec::new();
1130        {
1131            if remove_nulls {
1132                let mut writer = LineDelimitedWriter::new(&mut buf);
1133                writer.write_batches(&[&batch]).unwrap();
1134            } else {
1135                let mut writer = WriterBuilder::new()
1136                    .with_explicit_nulls(true)
1137                    .build::<_, LineDelimited>(&mut buf);
1138                writer.write_batches(&[&batch]).unwrap();
1139            }
1140        }
1141
1142        let result = str::from_utf8(&buf).unwrap();
1143        let expected = read_to_string(test_file).unwrap();
1144        for (r, e) in result.lines().zip(expected.lines()) {
1145            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1146            if remove_nulls {
1147                // remove null value from object to make comparison consistent:
1148                if let Value::Object(obj) = expected_json {
1149                    expected_json =
1150                        Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1151                }
1152            }
1153            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1154        }
1155    }
1156
1157    #[test]
1158    fn write_basic_rows() {
1159        test_write_for_file("test/data/basic.json", true);
1160    }
1161
1162    #[test]
1163    fn write_arrays() {
1164        test_write_for_file("test/data/arrays.json", true);
1165    }
1166
1167    #[test]
1168    fn write_basic_nulls() {
1169        test_write_for_file("test/data/basic_nulls.json", true);
1170    }
1171
1172    #[test]
1173    fn write_nested_with_nulls() {
1174        test_write_for_file("test/data/nested_with_nulls.json", false);
1175    }
1176
1177    #[test]
1178    fn json_line_writer_empty() {
1179        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1180        writer.finish().unwrap();
1181        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1182    }
1183
1184    #[test]
1185    fn json_array_writer_empty() {
1186        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1187        writer.finish().unwrap();
1188        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1189    }
1190
1191    #[test]
1192    fn json_line_writer_empty_batch() {
1193        let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1194
1195        let array = Int32Array::from(Vec::<i32>::new());
1196        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1197        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1198
1199        writer.write(&batch).unwrap();
1200        writer.finish().unwrap();
1201        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1202    }
1203
1204    #[test]
1205    fn json_array_writer_empty_batch() {
1206        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1207
1208        let array = Int32Array::from(Vec::<i32>::new());
1209        let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1210        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1211
1212        writer.write(&batch).unwrap();
1213        writer.finish().unwrap();
1214        assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1215    }
1216
1217    #[test]
1218    fn json_struct_array_nulls() {
1219        let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1220            Some(vec![Some(1), Some(2)]),
1221            Some(vec![None]),
1222            Some(vec![]),
1223            Some(vec![Some(3), None]), // masked for a
1224            Some(vec![Some(4), Some(5)]),
1225            None, // masked for a
1226            None,
1227        ]);
1228
1229        let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1230        let array = Arc::new(inner) as ArrayRef;
1231        let struct_array_a = StructArray::from((
1232            vec![(field.clone(), array.clone())],
1233            Buffer::from([0b01010111]),
1234        ));
1235        let struct_array_b = StructArray::from(vec![(field, array)]);
1236
1237        let schema = Schema::new(vec![
1238            Field::new_struct("a", struct_array_a.fields().clone(), true),
1239            Field::new_struct("b", struct_array_b.fields().clone(), true),
1240        ]);
1241
1242        let batch = RecordBatch::try_new(
1243            Arc::new(schema),
1244            vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1245        )
1246        .unwrap();
1247
1248        let mut buf = Vec::new();
1249        {
1250            let mut writer = LineDelimitedWriter::new(&mut buf);
1251            writer.write_batches(&[&batch]).unwrap();
1252        }
1253
1254        assert_json_eq(
1255            &buf,
1256            r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1257{"a":{"list":[null]},"b":{"list":[null]}}
1258{"a":{"list":[]},"b":{"list":[]}}
1259{"b":{"list":[3,null]}}
1260{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1261{"b":{}}
1262{"a":{},"b":{}}
1263"#,
1264        );
1265    }
1266
1267    #[test]
1268    fn json_writer_map() {
1269        let keys_array = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1270        let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1271
1272        let keys = Arc::new(Field::new("keys", DataType::Utf8, false));
1273        let values = Arc::new(Field::new("values", DataType::Int64, false));
1274        let entry_struct = StructArray::from(vec![
1275            (keys, Arc::new(keys_array) as ArrayRef),
1276            (values, Arc::new(values_array) as ArrayRef),
1277        ]);
1278
1279        let map_data_type = DataType::Map(
1280            Arc::new(Field::new(
1281                "entries",
1282                entry_struct.data_type().clone(),
1283                false,
1284            )),
1285            false,
1286        );
1287
1288        // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}, {"quux": 50}, {}]
1289        let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
1290        let valid_buffer = Buffer::from([0b00111101]);
1291
1292        let map_data = ArrayData::builder(map_data_type.clone())
1293            .len(6)
1294            .null_bit_buffer(Some(valid_buffer))
1295            .add_buffer(entry_offsets)
1296            .add_child_data(entry_struct.into_data())
1297            .build()
1298            .unwrap();
1299
1300        let map = MapArray::from(map_data);
1301
1302        let map_field = Field::new("map", map_data_type, true);
1303        let schema = Arc::new(Schema::new(vec![map_field]));
1304
1305        let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1306
1307        let mut buf = Vec::new();
1308        {
1309            let mut writer = LineDelimitedWriter::new(&mut buf);
1310            writer.write_batches(&[&batch]).unwrap();
1311        }
1312
1313        assert_json_eq(
1314            &buf,
1315            r#"{"map":{"foo":10}}
1316{}
1317{"map":{}}
1318{"map":{"bar":20,"baz":30,"qux":40}}
1319{"map":{"quux":50}}
1320{"map":{}}
1321"#,
1322        );
1323    }
1324
1325    #[test]
1326    fn test_write_single_batch() {
1327        let test_file = "test/data/basic.json";
1328        let file = File::open(test_file).unwrap();
1329        let mut reader = BufReader::new(file);
1330        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1331        reader.rewind().unwrap();
1332
1333        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1334        let mut reader = builder.build(reader).unwrap();
1335        let batch = reader.next().unwrap().unwrap();
1336
1337        let mut buf = Vec::new();
1338        {
1339            let mut writer = LineDelimitedWriter::new(&mut buf);
1340            writer.write(&batch).unwrap();
1341        }
1342
1343        let result = str::from_utf8(&buf).unwrap();
1344        let expected = read_to_string(test_file).unwrap();
1345        for (r, e) in result.lines().zip(expected.lines()) {
1346            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1347            // remove null value from object to make comparison consistent:
1348            if let Value::Object(obj) = expected_json {
1349                expected_json =
1350                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1351            }
1352            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1353        }
1354    }
1355
1356    #[test]
1357    fn test_write_multi_batches() {
1358        let test_file = "test/data/basic.json";
1359
1360        let schema = SchemaRef::new(Schema::new(vec![
1361            Field::new("a", DataType::Int64, true),
1362            Field::new("b", DataType::Float64, true),
1363            Field::new("c", DataType::Boolean, true),
1364            Field::new("d", DataType::Utf8, true),
1365            Field::new("e", DataType::Utf8, true),
1366            Field::new("f", DataType::Utf8, true),
1367            Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1368            Field::new("h", DataType::Float16, true),
1369        ]));
1370
1371        let mut reader = ReaderBuilder::new(schema.clone())
1372            .build(BufReader::new(File::open(test_file).unwrap()))
1373            .unwrap();
1374        let batch = reader.next().unwrap().unwrap();
1375
1376        // test batches = an empty batch + 2 same batches, finally result should be eq to 2 same batches
1377        let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1378
1379        let mut buf = Vec::new();
1380        {
1381            let mut writer = LineDelimitedWriter::new(&mut buf);
1382            writer.write_batches(&batches).unwrap();
1383        }
1384
1385        let result = str::from_utf8(&buf).unwrap();
1386        let expected = read_to_string(test_file).unwrap();
1387        // result is eq to 2 same batches
1388        let expected = format!("{expected}\n{expected}");
1389        for (r, e) in result.lines().zip(expected.lines()) {
1390            let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1391            // remove null value from object to make comparison consistent:
1392            if let Value::Object(obj) = expected_json {
1393                expected_json =
1394                    Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1395            }
1396            assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1397        }
1398    }
1399
1400    #[test]
1401    fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1402        fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1403            let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1404                Some(vec![None, None, None]),
1405                Some(vec![Some(1), Some(2), Some(3)]),
1406                None,
1407                Some(vec![None, None, None]),
1408            ]));
1409            let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1410            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1411            (array, field)
1412        }
1413
1414        fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1415            let array = Arc::new(DictionaryArray::from_iter(vec![
1416                Some("cupcakes"),
1417                None,
1418                Some("bear"),
1419                Some("kuma"),
1420            ]));
1421            let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1422            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1423            (array, field)
1424        }
1425
1426        fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1427            let string_builder = StringBuilder::new();
1428            let int_builder = Int64Builder::new();
1429            let mut builder = MapBuilder::new(None, string_builder, int_builder);
1430
1431            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1432            builder.keys().append_value("foo");
1433            builder.values().append_value(10);
1434            builder.append(true).unwrap();
1435
1436            builder.append(false).unwrap();
1437
1438            builder.append(true).unwrap();
1439
1440            builder.keys().append_value("bar");
1441            builder.values().append_value(20);
1442            builder.keys().append_value("baz");
1443            builder.values().append_value(30);
1444            builder.keys().append_value("qux");
1445            builder.values().append_value(40);
1446            builder.append(true).unwrap();
1447
1448            let array = Arc::new(builder.finish());
1449            let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1450            (array, field)
1451        }
1452
1453        fn root_list() -> (Arc<ListArray>, Field) {
1454            let struct_array = StructArray::from(vec![
1455                (
1456                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1457                    Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1458                ),
1459                (
1460                    Arc::new(Field::new("int32", DataType::Int32, true)),
1461                    Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1462                ),
1463            ]);
1464
1465            let field = Field::new_list(
1466                "list",
1467                Field::new("struct", struct_array.data_type().clone(), true),
1468                true,
1469            );
1470
1471            // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1472            let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
1473            let data = ArrayData::builder(field.data_type().clone())
1474                .len(4)
1475                .add_buffer(entry_offsets)
1476                .add_child_data(struct_array.into_data())
1477                .null_bit_buffer(Some([0b00000101].into()))
1478                .build()
1479                .unwrap();
1480            let array = Arc::new(ListArray::from(data));
1481            (array, field)
1482        }
1483
1484        let (nested_list_array, nested_list_field) = nested_list();
1485        let (nested_dict_array, nested_dict_field) = nested_dict();
1486        let (nested_map_array, nested_map_field) = nested_map();
1487        let (root_list_array, root_list_field) = root_list();
1488
1489        let schema = Schema::new(vec![
1490            Field::new("date", DataType::Date32, true),
1491            Field::new("null", DataType::Null, true),
1492            Field::new_struct(
1493                "struct",
1494                vec![
1495                    Arc::new(Field::new("utf8", DataType::Utf8, true)),
1496                    nested_list_field.clone(),
1497                    nested_dict_field.clone(),
1498                    nested_map_field.clone(),
1499                ],
1500                true,
1501            ),
1502            root_list_field,
1503        ]);
1504
1505        let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1506        let arr_null = NullArray::new(4);
1507        let arr_struct = StructArray::from(vec![
1508            // [{"utf8":"a"},{"utf8":null},{"utf8":null},{"utf8":"b"}]
1509            (
1510                Arc::new(Field::new("utf8", DataType::Utf8, true)),
1511                Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1512            ),
1513            // [{"list":[null,null,null]},{"list":[1,2,3]},{"list":null},{"list":[null,null,null]}]
1514            (nested_list_field, nested_list_array as ArrayRef),
1515            // [{"dict":"cupcakes"},{"dict":null},{"dict":"bear"},{"dict":"kuma"}]
1516            (nested_dict_field, nested_dict_array as ArrayRef),
1517            // [{"foo": 10}, null, {}, {"bar": 20, "baz": 30, "qux": 40}]
1518            (nested_map_field, nested_map_array as ArrayRef),
1519        ]);
1520
1521        let batch = RecordBatch::try_new(
1522            Arc::new(schema),
1523            vec![
1524                // [{"date":"1970-01-01"},{"date":null},{"date":"1970-01-02"},{"date":null}]
1525                Arc::new(arr_date32),
1526                // [{"null":null},{"null":null},{"null":null},{"null":null}]
1527                Arc::new(arr_null),
1528                Arc::new(arr_struct),
1529                // [{"list":[{"int32":1,"utf8":"a"},{"int32":null,"utf8":"b"}]},{"list":null},{"list":[{int32":5,"utf8":null}]},{"list":null}]
1530                root_list_array,
1531            ],
1532        )?;
1533
1534        let mut buf = Vec::new();
1535        {
1536            let mut writer = WriterBuilder::new()
1537                .with_explicit_nulls(true)
1538                .build::<_, JsonArray>(&mut buf);
1539            writer.write_batches(&[&batch])?;
1540            writer.finish()?;
1541        }
1542
1543        let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1544        let expected = serde_json::from_value::<Vec<Value>>(json!([
1545          {
1546            "date": "1970-01-01",
1547            "list": [
1548              {
1549                "int32": 1,
1550                "utf8": "a"
1551              },
1552              {
1553                "int32": null,
1554                "utf8": "b"
1555              }
1556            ],
1557            "null": null,
1558            "struct": {
1559              "dict": "cupcakes",
1560              "list": [
1561                null,
1562                null,
1563                null
1564              ],
1565              "map": {
1566                "foo": 10
1567              },
1568              "utf8": "a"
1569            }
1570          },
1571          {
1572            "date": null,
1573            "list": null,
1574            "null": null,
1575            "struct": {
1576              "dict": null,
1577              "list": [
1578                1,
1579                2,
1580                3
1581              ],
1582              "map": null,
1583              "utf8": null
1584            }
1585          },
1586          {
1587            "date": "1970-01-02",
1588            "list": [
1589              {
1590                "int32": 5,
1591                "utf8": null
1592              }
1593            ],
1594            "null": null,
1595            "struct": {
1596              "dict": "bear",
1597              "list": null,
1598              "map": {},
1599              "utf8": null
1600            }
1601          },
1602          {
1603            "date": null,
1604            "list": null,
1605            "null": null,
1606            "struct": {
1607              "dict": "kuma",
1608              "list": [
1609                null,
1610                null,
1611                null
1612              ],
1613              "map": {
1614                "bar": 20,
1615                "baz": 30,
1616                "qux": 40
1617              },
1618              "utf8": "b"
1619            }
1620          }
1621        ]))
1622        .unwrap();
1623
1624        assert_eq!(actual, expected);
1625
1626        Ok(())
1627    }
1628
1629    fn binary_encoding_test<O: OffsetSizeTrait>() {
1630        // set up schema
1631        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1632            "bytes",
1633            GenericBinaryType::<O>::DATA_TYPE,
1634            true,
1635        )]));
1636
1637        // build record batch:
1638        let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1639        let values = [Some(b"Ned Flanders"), None, Some(b"Troy McClure")];
1640        for value in values {
1641            match value {
1642                Some(v) => builder.append_value(v),
1643                None => builder.append_null(),
1644            }
1645        }
1646        let array = Arc::new(builder.finish()) as ArrayRef;
1647        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1648
1649        // encode and check JSON with explicit nulls:
1650        {
1651            let mut buf = Vec::new();
1652            let json_value: Value = {
1653                let mut writer = WriterBuilder::new()
1654                    .with_explicit_nulls(true)
1655                    .build::<_, JsonArray>(&mut buf);
1656                writer.write(&batch).unwrap();
1657                writer.close().unwrap();
1658                serde_json::from_slice(&buf).unwrap()
1659            };
1660
1661            assert_eq!(
1662                json!([
1663                    {
1664                        "bytes": "4e656420466c616e64657273"
1665                    },
1666                    {
1667                        "bytes": null // the explicit null
1668                    },
1669                    {
1670                        "bytes": "54726f79204d63436c757265"
1671                    }
1672                ]),
1673                json_value,
1674            );
1675        }
1676
1677        // encode and check JSON with no explicit nulls:
1678        {
1679            let mut buf = Vec::new();
1680            let json_value: Value = {
1681                // explicit nulls are off by default, so we don't need
1682                // to set that when creating the writer:
1683                let mut writer = ArrayWriter::new(&mut buf);
1684                writer.write(&batch).unwrap();
1685                writer.close().unwrap();
1686                serde_json::from_slice(&buf).unwrap()
1687            };
1688
1689            assert_eq!(
1690                json!([
1691                    {
1692                        "bytes": "4e656420466c616e64657273"
1693                    },
1694                    {}, // empty because nulls are omitted
1695                    {
1696                        "bytes": "54726f79204d63436c757265"
1697                    }
1698                ]),
1699                json_value
1700            );
1701        }
1702    }
1703
1704    #[test]
1705    fn test_writer_binary() {
1706        // Binary:
1707        binary_encoding_test::<i32>();
1708        // LargeBinary:
1709        binary_encoding_test::<i64>();
1710    }
1711
1712    #[test]
1713    fn test_writer_fixed_size_binary() {
1714        // set up schema:
1715        let size = 11;
1716        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1717            "bytes",
1718            DataType::FixedSizeBinary(size),
1719            true,
1720        )]));
1721
1722        // build record batch:
1723        let mut builder = FixedSizeBinaryBuilder::new(size);
1724        let values = [Some(b"hello world"), None, Some(b"summer rain")];
1725        for value in values {
1726            match value {
1727                Some(v) => builder.append_value(v).unwrap(),
1728                None => builder.append_null(),
1729            }
1730        }
1731        let array = Arc::new(builder.finish()) as ArrayRef;
1732        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1733
1734        // encode and check JSON with explicit nulls:
1735        {
1736            let mut buf = Vec::new();
1737            let json_value: Value = {
1738                let mut writer = WriterBuilder::new()
1739                    .with_explicit_nulls(true)
1740                    .build::<_, JsonArray>(&mut buf);
1741                writer.write(&batch).unwrap();
1742                writer.close().unwrap();
1743                serde_json::from_slice(&buf).unwrap()
1744            };
1745
1746            assert_eq!(
1747                json!([
1748                    {
1749                        "bytes": "68656c6c6f20776f726c64"
1750                    },
1751                    {
1752                        "bytes": null // the explicit null
1753                    },
1754                    {
1755                        "bytes": "73756d6d6572207261696e"
1756                    }
1757                ]),
1758                json_value,
1759            );
1760        }
1761        // encode and check JSON with no explicit nulls:
1762        {
1763            let mut buf = Vec::new();
1764            let json_value: Value = {
1765                // explicit nulls are off by default, so we don't need
1766                // to set that when creating the writer:
1767                let mut writer = ArrayWriter::new(&mut buf);
1768                writer.write(&batch).unwrap();
1769                writer.close().unwrap();
1770                serde_json::from_slice(&buf).unwrap()
1771            };
1772
1773            assert_eq!(
1774                json!([
1775                    {
1776                        "bytes": "68656c6c6f20776f726c64"
1777                    },
1778                    {}, // empty because nulls are omitted
1779                    {
1780                        "bytes": "73756d6d6572207261696e"
1781                    }
1782                ]),
1783                json_value,
1784            );
1785        }
1786    }
1787
1788    #[test]
1789    fn test_writer_fixed_size_list() {
1790        let size = 3;
1791        let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
1792        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1793            "list",
1794            DataType::FixedSizeList(field, size),
1795            true,
1796        )]));
1797
1798        let values_builder = Int32Builder::new();
1799        let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
1800        let lists = [
1801            Some([Some(1), Some(2), None]),
1802            Some([Some(3), None, Some(4)]),
1803            Some([None, Some(5), Some(6)]),
1804            None,
1805        ];
1806        for list in lists {
1807            match list {
1808                Some(l) => {
1809                    for value in l {
1810                        match value {
1811                            Some(v) => list_builder.values().append_value(v),
1812                            None => list_builder.values().append_null(),
1813                        }
1814                    }
1815                    list_builder.append(true);
1816                }
1817                None => {
1818                    for _ in 0..size {
1819                        list_builder.values().append_null();
1820                    }
1821                    list_builder.append(false);
1822                }
1823            }
1824        }
1825        let array = Arc::new(list_builder.finish()) as ArrayRef;
1826        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1827
1828        //encode and check JSON with explicit nulls:
1829        {
1830            let json_value: Value = {
1831                let mut buf = Vec::new();
1832                let mut writer = WriterBuilder::new()
1833                    .with_explicit_nulls(true)
1834                    .build::<_, JsonArray>(&mut buf);
1835                writer.write(&batch).unwrap();
1836                writer.close().unwrap();
1837                serde_json::from_slice(&buf).unwrap()
1838            };
1839            assert_eq!(
1840                json!([
1841                    {"list": [1, 2, null]},
1842                    {"list": [3, null, 4]},
1843                    {"list": [null, 5, 6]},
1844                    {"list": null},
1845                ]),
1846                json_value
1847            );
1848        }
1849        // encode and check JSON with no explicit nulls:
1850        {
1851            let json_value: Value = {
1852                let mut buf = Vec::new();
1853                let mut writer = ArrayWriter::new(&mut buf);
1854                writer.write(&batch).unwrap();
1855                writer.close().unwrap();
1856                serde_json::from_slice(&buf).unwrap()
1857            };
1858            assert_eq!(
1859                json!([
1860                    {"list": [1, 2, null]},
1861                    {"list": [3, null, 4]},
1862                    {"list": [null, 5, 6]},
1863                    {}, // empty because nulls are omitted
1864                ]),
1865                json_value
1866            );
1867        }
1868    }
1869
1870    #[test]
1871    fn test_writer_null_dict() {
1872        let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
1873        let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
1874        let dict = DictionaryArray::new(keys, values);
1875
1876        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1877            "my_dict",
1878            DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
1879            true,
1880        )]));
1881
1882        let array = Arc::new(dict) as ArrayRef;
1883        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1884
1885        let mut json = Vec::new();
1886        let write_builder = WriterBuilder::new().with_explicit_nulls(true);
1887        let mut writer = write_builder.build::<_, JsonArray>(&mut json);
1888        writer.write(&batch).unwrap();
1889        writer.close().unwrap();
1890
1891        let json_str = str::from_utf8(&json).unwrap();
1892        assert_eq!(
1893            json_str,
1894            r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":null}]"#
1895        )
1896    }
1897
1898    #[test]
1899    fn test_decimal128_encoder() {
1900        let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
1901            .with_precision_and_scale(10, 2)
1902            .unwrap();
1903        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1904        let schema = Schema::new(vec![field]);
1905        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1906
1907        let mut buf = Vec::new();
1908        {
1909            let mut writer = LineDelimitedWriter::new(&mut buf);
1910            writer.write_batches(&[&batch]).unwrap();
1911        }
1912
1913        assert_json_eq(
1914            &buf,
1915            r#"{"decimal":12.34}
1916{"decimal":56.78}
1917{"decimal":90.12}
1918"#,
1919        );
1920    }
1921
1922    #[test]
1923    fn test_decimal256_encoder() {
1924        let array = Decimal256Array::from_iter_values([
1925            i256::from(123400),
1926            i256::from(567800),
1927            i256::from(901200),
1928        ])
1929        .with_precision_and_scale(10, 4)
1930        .unwrap();
1931        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1932        let schema = Schema::new(vec![field]);
1933        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1934
1935        let mut buf = Vec::new();
1936        {
1937            let mut writer = LineDelimitedWriter::new(&mut buf);
1938            writer.write_batches(&[&batch]).unwrap();
1939        }
1940
1941        assert_json_eq(
1942            &buf,
1943            r#"{"decimal":12.3400}
1944{"decimal":56.7800}
1945{"decimal":90.1200}
1946"#,
1947        );
1948    }
1949
1950    #[test]
1951    fn test_decimal_encoder_with_nulls() {
1952        let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
1953            .with_precision_and_scale(10, 2)
1954            .unwrap();
1955        let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1956        let schema = Schema::new(vec![field]);
1957        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1958
1959        let mut buf = Vec::new();
1960        {
1961            let mut writer = LineDelimitedWriter::new(&mut buf);
1962            writer.write_batches(&[&batch]).unwrap();
1963        }
1964
1965        assert_json_eq(
1966            &buf,
1967            r#"{"decimal":12.34}
1968{}
1969{"decimal":56.78}
1970"#,
1971        );
1972    }
1973
1974    #[test]
1975    fn write_structs_as_list() {
1976        let schema = Schema::new(vec![
1977            Field::new(
1978                "c1",
1979                DataType::Struct(Fields::from(vec![
1980                    Field::new("c11", DataType::Int32, true),
1981                    Field::new(
1982                        "c12",
1983                        DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1984                        false,
1985                    ),
1986                ])),
1987                false,
1988            ),
1989            Field::new("c2", DataType::Utf8, false),
1990        ]);
1991
1992        let c1 = StructArray::from(vec![
1993            (
1994                Arc::new(Field::new("c11", DataType::Int32, true)),
1995                Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1996            ),
1997            (
1998                Arc::new(Field::new(
1999                    "c12",
2000                    DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2001                    false,
2002                )),
2003                Arc::new(StructArray::from(vec![(
2004                    Arc::new(Field::new("c121", DataType::Utf8, false)),
2005                    Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2006                )])) as ArrayRef,
2007            ),
2008        ]);
2009        let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2010
2011        let batch =
2012            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2013
2014        let expected = r#"[[1,["e"]],"a"]
2015[[null,["f"]],"b"]
2016[[5,["g"]],"c"]
2017"#;
2018
2019        let mut buf = Vec::new();
2020        {
2021            let builder = WriterBuilder::new()
2022                .with_explicit_nulls(true)
2023                .with_struct_mode(StructMode::ListOnly);
2024            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2025            writer.write_batches(&[&batch]).unwrap();
2026        }
2027        assert_json_eq(&buf, expected);
2028
2029        let mut buf = Vec::new();
2030        {
2031            let builder = WriterBuilder::new()
2032                .with_explicit_nulls(false)
2033                .with_struct_mode(StructMode::ListOnly);
2034            let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2035            writer.write_batches(&[&batch]).unwrap();
2036        }
2037        assert_json_eq(&buf, expected);
2038    }
2039}