arrow_json/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! JSON reader
19//!
20//! This JSON reader allows JSON records to be read into the Arrow memory
21//! model. Records are loaded in batches and are then converted from the record-oriented
22//! representation to the columnar arrow data model.
23//!
24//! The reader ignores whitespace between JSON values, including `\n` and `\r`, allowing
25//! parsing of sequences of one or more arbitrarily formatted JSON values, including
26//! but not limited to newline-delimited JSON.
27//!
28//! # Basic Usage
29//!
30//! [`Reader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
31//!
32//! ```
33//! # use arrow_schema::*;
34//! # use std::fs::File;
35//! # use std::io::BufReader;
36//! # use std::sync::Arc;
37//!
38//! let schema = Arc::new(Schema::new(vec![
39//!     Field::new("a", DataType::Float64, false),
40//!     Field::new("b", DataType::Float64, false),
41//!     Field::new("c", DataType::Boolean, true),
42//! ]));
43//!
44//! let file = File::open("test/data/basic.json").unwrap();
45//!
46//! let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
47//! let batch = json.next().unwrap().unwrap();
48//! ```
49//!
50//! # Async Usage
51//!
52//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
53//! and is designed to be agnostic to the various different kinds of async IO primitives found
54//! within the Rust ecosystem.
55//!
56//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
57//!
58//! ```
59//! # use std::task::{Poll, ready};
60//! # use bytes::{Buf, Bytes};
61//! # use arrow_schema::ArrowError;
62//! # use futures::stream::{Stream, StreamExt};
63//! # use arrow_array::RecordBatch;
64//! # use arrow_json::reader::Decoder;
65//! #
66//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
67//!     mut decoder: Decoder,
68//!     mut input: S,
69//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
70//!     let mut buffered = Bytes::new();
71//!     futures::stream::poll_fn(move |cx| {
72//!         loop {
73//!             if buffered.is_empty() {
74//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
75//!                     Some(b) => b,
76//!                     None => break,
77//!                 };
78//!             }
79//!             let decoded = match decoder.decode(buffered.as_ref()) {
80//!                 Ok(decoded) => decoded,
81//!                 Err(e) => return Poll::Ready(Some(Err(e))),
82//!             };
83//!             let read = buffered.len();
84//!             buffered.advance(decoded);
85//!             if decoded != read {
86//!                 break
87//!             }
88//!         }
89//!
90//!         Poll::Ready(decoder.flush().transpose())
91//!     })
92//! }
93//!
94//! ```
95//!
96//! In a similar vein, it can also be used with tokio-based IO primitives
97//!
98//! ```
99//! # use std::sync::Arc;
100//! # use arrow_schema::{DataType, Field, Schema};
101//! # use std::pin::Pin;
102//! # use std::task::{Poll, ready};
103//! # use futures::{Stream, TryStreamExt};
104//! # use tokio::io::AsyncBufRead;
105//! # use arrow_array::RecordBatch;
106//! # use arrow_json::reader::Decoder;
107//! # use arrow_schema::ArrowError;
108//! fn decode_stream<R: AsyncBufRead + Unpin>(
109//!     mut decoder: Decoder,
110//!     mut reader: R,
111//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
112//!     futures::stream::poll_fn(move |cx| {
113//!         loop {
114//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
115//!                 Ok(b) if b.is_empty() => break,
116//!                 Ok(b) => b,
117//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
118//!             };
119//!             let read = b.len();
120//!             let decoded = match decoder.decode(b) {
121//!                 Ok(decoded) => decoded,
122//!                 Err(e) => return Poll::Ready(Some(Err(e))),
123//!             };
124//!             Pin::new(&mut reader).consume(decoded);
125//!             if decoded != read {
126//!                 break;
127//!             }
128//!         }
129//!
130//!         Poll::Ready(decoder.flush().transpose())
131//!     })
132//! }
133//! ```
134//!
135
136use crate::StructMode;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use chrono::Utc;
141use serde::Serialize;
142
143use arrow_array::timezone::Tz;
144use arrow_array::types::*;
145use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader, StructArray};
146use arrow_data::ArrayData;
147use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
148pub use schema::*;
149
150use crate::reader::boolean_array::BooleanArrayDecoder;
151use crate::reader::decimal_array::DecimalArrayDecoder;
152use crate::reader::list_array::ListArrayDecoder;
153use crate::reader::map_array::MapArrayDecoder;
154use crate::reader::null_array::NullArrayDecoder;
155use crate::reader::primitive_array::PrimitiveArrayDecoder;
156use crate::reader::string_array::StringArrayDecoder;
157use crate::reader::struct_array::StructArrayDecoder;
158use crate::reader::tape::{Tape, TapeDecoder};
159use crate::reader::timestamp_array::TimestampArrayDecoder;
160
161mod boolean_array;
162mod decimal_array;
163mod list_array;
164mod map_array;
165mod null_array;
166mod primitive_array;
167mod schema;
168mod serializer;
169mod string_array;
170mod struct_array;
171mod tape;
172mod timestamp_array;
173
174/// A builder for [`Reader`] and [`Decoder`]
175pub struct ReaderBuilder {
176    batch_size: usize,
177    coerce_primitive: bool,
178    strict_mode: bool,
179    is_field: bool,
180    struct_mode: StructMode,
181
182    schema: SchemaRef,
183}
184
185impl ReaderBuilder {
186    /// Create a new [`ReaderBuilder`] with the provided [`SchemaRef`]
187    ///
188    /// This could be obtained using [`infer_json_schema`] if not known
189    ///
190    /// Any columns not present in `schema` will be ignored, unless `strict_mode` is set to true.
191    /// In this case, an error is returned when a column is missing from `schema`.
192    ///
193    /// [`infer_json_schema`]: crate::reader::infer_json_schema
194    pub fn new(schema: SchemaRef) -> Self {
195        Self {
196            batch_size: 1024,
197            coerce_primitive: false,
198            strict_mode: false,
199            is_field: false,
200            struct_mode: Default::default(),
201            schema,
202        }
203    }
204
205    /// Create a new [`ReaderBuilder`] that will parse JSON values of `field.data_type()`
206    ///
207    /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON data
208    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
209    ///
210    /// ```
211    /// # use std::sync::Arc;
212    /// # use arrow_array::cast::AsArray;
213    /// # use arrow_array::types::Int32Type;
214    /// # use arrow_json::ReaderBuilder;
215    /// # use arrow_schema::{DataType, Field};
216    /// // Root of JSON schema is a numeric type
217    /// let data = "1\n2\n3\n";
218    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
219    /// let mut reader = ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
220    /// let b = reader.next().unwrap().unwrap();
221    /// let values = b.column(0).as_primitive::<Int32Type>().values();
222    /// assert_eq!(values, &[1, 2, 3]);
223    ///
224    /// // Root of JSON schema is a list type
225    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
226    /// let field = Field::new_list("int", field.clone(), true);
227    /// let mut reader = ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
228    /// let b = reader.next().unwrap().unwrap();
229    /// let list = b.column(0).as_list::<i32>();
230    ///
231    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
232    /// let list_values = list.values().as_primitive::<Int32Type>();
233    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
234    /// ```
235    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
236        Self {
237            batch_size: 1024,
238            coerce_primitive: false,
239            strict_mode: false,
240            is_field: true,
241            struct_mode: Default::default(),
242            schema: Arc::new(Schema::new([field.into()])),
243        }
244    }
245
246    /// Sets the batch size in rows to read
247    pub fn with_batch_size(self, batch_size: usize) -> Self {
248        Self { batch_size, ..self }
249    }
250
251    /// Sets if the decoder should coerce primitive values (bool and number) into string
252    /// when the Schema's column is Utf8 or LargeUtf8.
253    pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
254        Self {
255            coerce_primitive,
256            ..self
257        }
258    }
259
260    /// Sets if the decoder should return an error if it encounters a column not
261    /// present in `schema`. If `struct_mode` is `ListOnly` the value of
262    /// `strict_mode` is effectively `true`. It is required for all fields of
263    /// the struct to be in the list: without field names, there is no way to
264    /// determine which field is missing.
265    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
266        Self {
267            strict_mode,
268            ..self
269        }
270    }
271
272    /// Set the [`StructMode`] for the reader, which determines whether structs
273    /// can be decoded from JSON as objects or lists. For more details refer to
274    /// the enum documentation. Default is to use `ObjectOnly`.
275    pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
276        Self {
277            struct_mode,
278            ..self
279        }
280    }
281
282    /// Create a [`Reader`] with the provided [`BufRead`]
283    pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
284        Ok(Reader {
285            reader,
286            decoder: self.build_decoder()?,
287        })
288    }
289
290    /// Create a [`Decoder`]
291    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
292        let (data_type, nullable) = match self.is_field {
293            false => (DataType::Struct(self.schema.fields.clone()), false),
294            true => {
295                let field = &self.schema.fields[0];
296                (field.data_type().clone(), field.is_nullable())
297            }
298        };
299
300        let decoder = make_decoder(
301            data_type,
302            self.coerce_primitive,
303            self.strict_mode,
304            nullable,
305            self.struct_mode,
306        )?;
307
308        let num_fields = self.schema.flattened_fields().len();
309
310        Ok(Decoder {
311            decoder,
312            is_field: self.is_field,
313            tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
314            batch_size: self.batch_size,
315            schema: self.schema,
316        })
317    }
318}
319
320/// Reads JSON data with a known schema directly into arrow [`RecordBatch`]
321///
322/// Lines consisting solely of ASCII whitespace are ignored
323pub struct Reader<R> {
324    reader: R,
325    decoder: Decoder,
326}
327
328impl<R> std::fmt::Debug for Reader<R> {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        f.debug_struct("Reader")
331            .field("decoder", &self.decoder)
332            .finish()
333    }
334}
335
336impl<R: BufRead> Reader<R> {
337    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
338    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
339        loop {
340            let buf = self.reader.fill_buf()?;
341            if buf.is_empty() {
342                break;
343            }
344            let read = buf.len();
345
346            let decoded = self.decoder.decode(buf)?;
347            self.reader.consume(decoded);
348            if decoded != read {
349                break;
350            }
351        }
352        self.decoder.flush()
353    }
354}
355
356impl<R: BufRead> Iterator for Reader<R> {
357    type Item = Result<RecordBatch, ArrowError>;
358
359    fn next(&mut self) -> Option<Self::Item> {
360        self.read().transpose()
361    }
362}
363
364impl<R: BufRead> RecordBatchReader for Reader<R> {
365    fn schema(&self) -> SchemaRef {
366        self.decoder.schema.clone()
367    }
368}
369
370/// A low-level interface for reading JSON data from a byte stream
371///
372/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
373///
374/// The push-based interface facilitates integration with sources that yield arbitrarily
375/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
376/// object storage
377///
378/// ```
379/// # use std::io::BufRead;
380/// # use arrow_array::RecordBatch;
381/// # use arrow_json::reader::{Decoder, ReaderBuilder};
382/// # use arrow_schema::{ArrowError, SchemaRef};
383/// #
384/// fn read_from_json<R: BufRead>(
385///     mut reader: R,
386///     schema: SchemaRef,
387/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
388///     let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
389///     let mut next = move || {
390///         loop {
391///             // Decoder is agnostic that buf doesn't contain whole records
392///             let buf = reader.fill_buf()?;
393///             if buf.is_empty() {
394///                 break; // Input exhausted
395///             }
396///             let read = buf.len();
397///             let decoded = decoder.decode(buf)?;
398///
399///             // Consume the number of bytes read
400///             reader.consume(decoded);
401///             if decoded != read {
402///                 break; // Read batch size
403///             }
404///         }
405///         decoder.flush()
406///     };
407///     Ok(std::iter::from_fn(move || next().transpose()))
408/// }
409/// ```
410pub struct Decoder {
411    tape_decoder: TapeDecoder,
412    decoder: Box<dyn ArrayDecoder>,
413    batch_size: usize,
414    is_field: bool,
415    schema: SchemaRef,
416}
417
418impl std::fmt::Debug for Decoder {
419    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420        f.debug_struct("Decoder")
421            .field("schema", &self.schema)
422            .field("batch_size", &self.batch_size)
423            .finish()
424    }
425}
426
427impl Decoder {
428    /// Read JSON objects from `buf`, returning the number of bytes read
429    ///
430    /// This method returns once `batch_size` objects have been parsed since the
431    /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
432    /// should be included in the next call to [`Self::decode`]
433    ///
434    /// There is no requirement that `buf` contains a whole number of records, facilitating
435    /// integration with arbitrary byte streams, such as those yielded by [`BufRead`]
436    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
437        self.tape_decoder.decode(buf)
438    }
439
440    /// Serialize `rows` to this [`Decoder`]
441    ///
442    /// This provides a simple way to convert [serde]-compatible datastructures into arrow
443    /// [`RecordBatch`].
444    ///
445    /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
446    /// especially where the schema is known at compile-time, however, this provides a mechanism
447    /// to get something up and running quickly
448    ///
449    /// It can be used with [`serde_json::Value`]
450    ///
451    /// ```
452    /// # use std::sync::Arc;
453    /// # use serde_json::{Value, json};
454    /// # use arrow_array::cast::AsArray;
455    /// # use arrow_array::types::Float32Type;
456    /// # use arrow_json::ReaderBuilder;
457    /// # use arrow_schema::{DataType, Field, Schema};
458    /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
459    ///
460    /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
461    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
462    ///
463    /// decoder.serialize(&json).unwrap();
464    /// let batch = decoder.flush().unwrap().unwrap();
465    /// assert_eq!(batch.num_rows(), 2);
466    /// assert_eq!(batch.num_columns(), 1);
467    /// let values = batch.column(0).as_primitive::<Float32Type>().values();
468    /// assert_eq!(values, &[2.3, 5.7])
469    /// ```
470    ///
471    /// Or with arbitrary [`Serialize`] types
472    ///
473    /// ```
474    /// # use std::sync::Arc;
475    /// # use arrow_json::ReaderBuilder;
476    /// # use arrow_schema::{DataType, Field, Schema};
477    /// # use serde::Serialize;
478    /// # use arrow_array::cast::AsArray;
479    /// # use arrow_array::types::{Float32Type, Int32Type};
480    /// #
481    /// #[derive(Serialize)]
482    /// struct MyStruct {
483    ///     int32: i32,
484    ///     float: f32,
485    /// }
486    ///
487    /// let schema = Schema::new(vec![
488    ///     Field::new("int32", DataType::Int32, false),
489    ///     Field::new("float", DataType::Float32, false),
490    /// ]);
491    ///
492    /// let rows = vec![
493    ///     MyStruct{ int32: 0, float: 3. },
494    ///     MyStruct{ int32: 4, float: 67.53 },
495    /// ];
496    ///
497    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
498    /// decoder.serialize(&rows).unwrap();
499    ///
500    /// let batch = decoder.flush().unwrap().unwrap();
501    ///
502    /// // Expect batch containing two columns
503    /// let int32 = batch.column(0).as_primitive::<Int32Type>();
504    /// assert_eq!(int32.values(), &[0, 4]);
505    ///
506    /// let float = batch.column(1).as_primitive::<Float32Type>();
507    /// assert_eq!(float.values(), &[3., 67.53]);
508    /// ```
509    ///
510    /// Or even complex nested types
511    ///
512    /// ```
513    /// # use std::collections::BTreeMap;
514    /// # use std::sync::Arc;
515    /// # use arrow_array::StructArray;
516    /// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
517    /// # use arrow_json::ReaderBuilder;
518    /// # use arrow_schema::{DataType, Field, Fields, Schema};
519    /// # use serde::Serialize;
520    /// #
521    /// #[derive(Serialize)]
522    /// struct MyStruct {
523    ///     int32: i32,
524    ///     list: Vec<f64>,
525    ///     nested: Vec<Option<Nested>>,
526    /// }
527    ///
528    /// impl MyStruct {
529    ///     /// Returns the [`Fields`] for [`MyStruct`]
530    ///     fn fields() -> Fields {
531    ///         let nested = DataType::Struct(Nested::fields());
532    ///         Fields::from([
533    ///             Arc::new(Field::new("int32", DataType::Int32, false)),
534    ///             Arc::new(Field::new_list(
535    ///                 "list",
536    ///                 Field::new("element", DataType::Float64, false),
537    ///                 false,
538    ///             )),
539    ///             Arc::new(Field::new_list(
540    ///                 "nested",
541    ///                 Field::new("element", nested, true),
542    ///                 true,
543    ///             )),
544    ///         ])
545    ///     }
546    /// }
547    ///
548    /// #[derive(Serialize)]
549    /// struct Nested {
550    ///     map: BTreeMap<String, Vec<String>>
551    /// }
552    ///
553    /// impl Nested {
554    ///     /// Returns the [`Fields`] for [`Nested`]
555    ///     fn fields() -> Fields {
556    ///         let element = Field::new("element", DataType::Utf8, false);
557    ///         Fields::from([
558    ///             Arc::new(Field::new_map(
559    ///                 "map",
560    ///                 "entries",
561    ///                 Field::new("key", DataType::Utf8, false),
562    ///                 Field::new_list("value", element, false),
563    ///                 false, // sorted
564    ///                 false, // nullable
565    ///             ))
566    ///         ])
567    ///     }
568    /// }
569    ///
570    /// let data = vec![
571    ///     MyStruct {
572    ///         int32: 34,
573    ///         list: vec![1., 2., 34.],
574    ///         nested: vec![
575    ///             None,
576    ///             Some(Nested {
577    ///                 map: vec![
578    ///                     ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
579    ///                     ("key2".to_string(), vec!["baz".to_string()])
580    ///                 ].into_iter().collect()
581    ///             })
582    ///         ]
583    ///     },
584    ///     MyStruct {
585    ///         int32: 56,
586    ///         list: vec![],
587    ///         nested: vec![]
588    ///     },
589    ///     MyStruct {
590    ///         int32: 24,
591    ///         list: vec![-1., 245.],
592    ///         nested: vec![None]
593    ///     }
594    /// ];
595    ///
596    /// let schema = Schema::new(MyStruct::fields());
597    /// let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
598    /// decoder.serialize(&data).unwrap();
599    /// let batch = decoder.flush().unwrap().unwrap();
600    /// assert_eq!(batch.num_rows(), 3);
601    /// assert_eq!(batch.num_columns(), 3);
602    ///
603    /// // Convert to StructArray to format
604    /// let s = StructArray::from(batch);
605    /// let options = FormatOptions::default().with_null("null");
606    /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
607    ///
608    /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
609    /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
610    /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
611    /// ```
612    ///
613    /// Note: this ignores any batch size setting, and always decodes all rows
614    pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
615        self.tape_decoder.serialize(rows)
616    }
617
618    /// True if the decoder is currently part way through decoding a record.
619    pub fn has_partial_record(&self) -> bool {
620        self.tape_decoder.has_partial_row()
621    }
622
623    /// The number of unflushed records, including the partially decoded record (if any).
624    pub fn len(&self) -> usize {
625        self.tape_decoder.num_buffered_rows()
626    }
627
628    /// True if there are no records to flush, i.e. [`Self::len`] is zero.
629    pub fn is_empty(&self) -> bool {
630        self.len() == 0
631    }
632
633    /// Flushes the currently buffered data to a [`RecordBatch`]
634    ///
635    /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true.
636    ///
637    /// Note: This will return an error if called part way through decoding a record,
638    /// i.e. [`Self::has_partial_record`] is true.
639    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
640        let tape = self.tape_decoder.finish()?;
641
642        if tape.num_rows() == 0 {
643            return Ok(None);
644        }
645
646        // First offset is null sentinel
647        let mut next_object = 1;
648        let pos: Vec<_> = (0..tape.num_rows())
649            .map(|_| {
650                let next = tape.next(next_object, "row").unwrap();
651                std::mem::replace(&mut next_object, next)
652            })
653            .collect();
654
655        let decoded = self.decoder.decode(&tape, &pos)?;
656        self.tape_decoder.clear();
657
658        let batch = match self.is_field {
659            true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
660            false => {
661                RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
662            }
663        };
664
665        Ok(Some(batch))
666    }
667}
668
669trait ArrayDecoder: Send {
670    /// Decode elements from `tape` starting at the indexes contained in `pos`
671    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
672}
673
674macro_rules! primitive_decoder {
675    ($t:ty, $data_type:expr) => {
676        Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
677    };
678}
679
680fn make_decoder(
681    data_type: DataType,
682    coerce_primitive: bool,
683    strict_mode: bool,
684    is_nullable: bool,
685    struct_mode: StructMode,
686) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
687    downcast_integer! {
688        data_type => (primitive_decoder, data_type),
689        DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
690        DataType::Float16 => primitive_decoder!(Float16Type, data_type),
691        DataType::Float32 => primitive_decoder!(Float32Type, data_type),
692        DataType::Float64 => primitive_decoder!(Float64Type, data_type),
693        DataType::Timestamp(TimeUnit::Second, None) => {
694            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
695        },
696        DataType::Timestamp(TimeUnit::Millisecond, None) => {
697            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
698        },
699        DataType::Timestamp(TimeUnit::Microsecond, None) => {
700            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
701        },
702        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
703            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
704        },
705        DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
706            let tz: Tz = tz.parse()?;
707            Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
708        },
709        DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
710            let tz: Tz = tz.parse()?;
711            Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
712        },
713        DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
714            let tz: Tz = tz.parse()?;
715            Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
716        },
717        DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
718            let tz: Tz = tz.parse()?;
719            Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
720        },
721        DataType::Date32 => primitive_decoder!(Date32Type, data_type),
722        DataType::Date64 => primitive_decoder!(Date64Type, data_type),
723        DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
724        DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
725        DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
726        DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
727        DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
728        DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
729        DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
730        DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
731        DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
732        DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
733        DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
734        DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
735        DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
736        DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
737        DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
738        DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
739        DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
740            Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
741        }
742        DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
743        d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use serde_json::json;
750    use std::fs::File;
751    use std::io::{BufReader, Cursor, Seek};
752
753    use arrow_array::cast::AsArray;
754    use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray};
755    use arrow_buffer::{ArrowNativeType, Buffer};
756    use arrow_cast::display::{ArrayFormatter, FormatOptions};
757    use arrow_data::ArrayDataBuilder;
758    use arrow_schema::{Field, Fields};
759
760    use super::*;
761
762    fn do_read(
763        buf: &str,
764        batch_size: usize,
765        coerce_primitive: bool,
766        strict_mode: bool,
767        schema: SchemaRef,
768    ) -> Vec<RecordBatch> {
769        let mut unbuffered = vec![];
770
771        // Test with different batch sizes to test for boundary conditions
772        for batch_size in [1, 3, 100, batch_size] {
773            unbuffered = ReaderBuilder::new(schema.clone())
774                .with_batch_size(batch_size)
775                .with_coerce_primitive(coerce_primitive)
776                .build(Cursor::new(buf.as_bytes()))
777                .unwrap()
778                .collect::<Result<Vec<_>, _>>()
779                .unwrap();
780
781            for b in unbuffered.iter().take(unbuffered.len() - 1) {
782                assert_eq!(b.num_rows(), batch_size)
783            }
784
785            // Test with different buffer sizes to test for boundary conditions
786            for b in [1, 3, 5] {
787                let buffered = ReaderBuilder::new(schema.clone())
788                    .with_batch_size(batch_size)
789                    .with_coerce_primitive(coerce_primitive)
790                    .with_strict_mode(strict_mode)
791                    .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
792                    .unwrap()
793                    .collect::<Result<Vec<_>, _>>()
794                    .unwrap();
795                assert_eq!(unbuffered, buffered);
796            }
797        }
798
799        unbuffered
800    }
801
802    #[test]
803    fn test_basic() {
804        let buf = r#"
805        {"a": 1, "b": 2, "c": true, "d": 1}
806        {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
807
808        {"b": 6, "a": 2.0, "d": 45}
809        {"b": "5", "a": 2}
810        {"b": 4e0}
811        {"b": 7, "a": null}
812        "#;
813
814        let schema = Arc::new(Schema::new(vec![
815            Field::new("a", DataType::Int64, true),
816            Field::new("b", DataType::Int32, true),
817            Field::new("c", DataType::Boolean, true),
818            Field::new("d", DataType::Date32, true),
819            Field::new("e", DataType::Date64, true),
820        ]));
821
822        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
823        assert!(decoder.is_empty());
824        assert_eq!(decoder.len(), 0);
825        assert!(!decoder.has_partial_record());
826        assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
827        assert!(!decoder.is_empty());
828        assert_eq!(decoder.len(), 6);
829        assert!(!decoder.has_partial_record());
830        let batch = decoder.flush().unwrap().unwrap();
831        assert_eq!(batch.num_rows(), 6);
832        assert!(decoder.is_empty());
833        assert_eq!(decoder.len(), 0);
834        assert!(!decoder.has_partial_record());
835
836        let batches = do_read(buf, 1024, false, false, schema);
837        assert_eq!(batches.len(), 1);
838
839        let col1 = batches[0].column(0).as_primitive::<Int64Type>();
840        assert_eq!(col1.null_count(), 2);
841        assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
842        assert!(col1.is_null(4));
843        assert!(col1.is_null(5));
844
845        let col2 = batches[0].column(1).as_primitive::<Int32Type>();
846        assert_eq!(col2.null_count(), 0);
847        assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
848
849        let col3 = batches[0].column(2).as_boolean();
850        assert_eq!(col3.null_count(), 4);
851        assert!(col3.value(0));
852        assert!(!col3.is_null(0));
853        assert!(!col3.value(1));
854        assert!(!col3.is_null(1));
855
856        let col4 = batches[0].column(3).as_primitive::<Date32Type>();
857        assert_eq!(col4.null_count(), 3);
858        assert!(col4.is_null(3));
859        assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
860
861        let col5 = batches[0].column(4).as_primitive::<Date64Type>();
862        assert_eq!(col5.null_count(), 5);
863        assert!(col5.is_null(0));
864        assert!(col5.is_null(2));
865        assert!(col5.is_null(3));
866        assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
867    }
868
869    #[test]
870    fn test_string() {
871        let buf = r#"
872        {"a": "1", "b": "2"}
873        {"a": "hello", "b": "shoo"}
874        {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
875
876        {"b": null}
877        {"b": "", "a": null}
878
879        "#;
880        let schema = Arc::new(Schema::new(vec![
881            Field::new("a", DataType::Utf8, true),
882            Field::new("b", DataType::LargeUtf8, true),
883        ]));
884
885        let batches = do_read(buf, 1024, false, false, schema);
886        assert_eq!(batches.len(), 1);
887
888        let col1 = batches[0].column(0).as_string::<i32>();
889        assert_eq!(col1.null_count(), 2);
890        assert_eq!(col1.value(0), "1");
891        assert_eq!(col1.value(1), "hello");
892        assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
893        assert!(col1.is_null(3));
894        assert!(col1.is_null(4));
895
896        let col2 = batches[0].column(1).as_string::<i64>();
897        assert_eq!(col2.null_count(), 1);
898        assert_eq!(col2.value(0), "2");
899        assert_eq!(col2.value(1), "shoo");
900        assert_eq!(col2.value(2), "\t😁foo");
901        assert!(col2.is_null(3));
902        assert_eq!(col2.value(4), "");
903    }
904
905    #[test]
906    fn test_complex() {
907        let buf = r#"
908           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
909           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
910           {"list": null, "nested": {"a": null}}
911        "#;
912
913        let schema = Arc::new(Schema::new(vec![
914            Field::new_list("list", Field::new("element", DataType::Int32, false), true),
915            Field::new_struct(
916                "nested",
917                vec![
918                    Field::new("a", DataType::Int32, true),
919                    Field::new("b", DataType::Int32, true),
920                ],
921                true,
922            ),
923            Field::new_struct(
924                "nested_list",
925                vec![Field::new_list(
926                    "list2",
927                    Field::new_struct(
928                        "element",
929                        vec![Field::new("c", DataType::Int32, false)],
930                        false,
931                    ),
932                    true,
933                )],
934                true,
935            ),
936        ]));
937
938        let batches = do_read(buf, 1024, false, false, schema);
939        assert_eq!(batches.len(), 1);
940
941        let list = batches[0].column(0).as_list::<i32>();
942        assert_eq!(list.len(), 3);
943        assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
944        assert_eq!(list.null_count(), 1);
945        assert!(list.is_null(2));
946        let list_values = list.values().as_primitive::<Int32Type>();
947        assert_eq!(list_values.values(), &[5, 6]);
948
949        let nested = batches[0].column(1).as_struct();
950        let a = nested.column(0).as_primitive::<Int32Type>();
951        assert_eq!(list.null_count(), 1);
952        assert_eq!(a.values(), &[1, 7, 0]);
953        assert!(list.is_null(2));
954
955        let b = nested.column(1).as_primitive::<Int32Type>();
956        assert_eq!(b.null_count(), 2);
957        assert_eq!(b.len(), 3);
958        assert_eq!(b.value(0), 2);
959        assert!(b.is_null(1));
960        assert!(b.is_null(2));
961
962        let nested_list = batches[0].column(2).as_struct();
963        assert_eq!(nested_list.len(), 3);
964        assert_eq!(nested_list.null_count(), 1);
965        assert!(nested_list.is_null(2));
966
967        let list2 = nested_list.column(0).as_list::<i32>();
968        assert_eq!(list2.len(), 3);
969        assert_eq!(list2.null_count(), 1);
970        assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
971        assert!(list2.is_null(2));
972
973        let list2_values = list2.values().as_struct();
974
975        let c = list2_values.column(0).as_primitive::<Int32Type>();
976        assert_eq!(c.values(), &[3, 4]);
977    }
978
979    #[test]
980    fn test_projection() {
981        let buf = r#"
982           {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
983           {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
984        "#;
985
986        let schema = Arc::new(Schema::new(vec![
987            Field::new_struct(
988                "nested",
989                vec![Field::new("a", DataType::Int32, false)],
990                true,
991            ),
992            Field::new_struct(
993                "nested_list",
994                vec![Field::new_list(
995                    "list2",
996                    Field::new_struct(
997                        "element",
998                        vec![Field::new("d", DataType::Int32, true)],
999                        false,
1000                    ),
1001                    true,
1002                )],
1003                true,
1004            ),
1005        ]));
1006
1007        let batches = do_read(buf, 1024, false, false, schema);
1008        assert_eq!(batches.len(), 1);
1009
1010        let nested = batches[0].column(0).as_struct();
1011        assert_eq!(nested.num_columns(), 1);
1012        let a = nested.column(0).as_primitive::<Int32Type>();
1013        assert_eq!(a.null_count(), 0);
1014        assert_eq!(a.values(), &[1, 7]);
1015
1016        let nested_list = batches[0].column(1).as_struct();
1017        assert_eq!(nested_list.num_columns(), 1);
1018        assert_eq!(nested_list.null_count(), 0);
1019
1020        let list2 = nested_list.column(0).as_list::<i32>();
1021        assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1022        assert_eq!(list2.null_count(), 0);
1023
1024        let child = list2.values().as_struct();
1025        assert_eq!(child.num_columns(), 1);
1026        assert_eq!(child.len(), 2);
1027        assert_eq!(child.null_count(), 0);
1028
1029        let c = child.column(0).as_primitive::<Int32Type>();
1030        assert_eq!(c.values(), &[5, 0]);
1031        assert_eq!(c.null_count(), 1);
1032        assert!(c.is_null(1));
1033    }
1034
1035    #[test]
1036    fn test_map() {
1037        let buf = r#"
1038           {"map": {"a": ["foo", null]}}
1039           {"map": {"a": [null], "b": []}}
1040           {"map": {"c": null, "a": ["baz"]}}
1041        "#;
1042        let map = Field::new_map(
1043            "map",
1044            "entries",
1045            Field::new("key", DataType::Utf8, false),
1046            Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1047            false,
1048            true,
1049        );
1050
1051        let schema = Arc::new(Schema::new(vec![map]));
1052
1053        let batches = do_read(buf, 1024, false, false, schema);
1054        assert_eq!(batches.len(), 1);
1055
1056        let map = batches[0].column(0).as_map();
1057        let map_keys = map.keys().as_string::<i32>();
1058        let map_values = map.values().as_list::<i32>();
1059        assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1060
1061        let k: Vec<_> = map_keys.iter().flatten().collect();
1062        assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1063
1064        let list_values = map_values.values().as_string::<i32>();
1065        let lv: Vec<_> = list_values.iter().collect();
1066        assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1067        assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1068        assert_eq!(map_values.null_count(), 1);
1069        assert!(map_values.is_null(3));
1070
1071        let options = FormatOptions::default().with_null("null");
1072        let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1073        assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1074        assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1075        assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1076    }
1077
1078    #[test]
1079    fn test_not_coercing_primitive_into_string_without_flag() {
1080        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1081
1082        let buf = r#"{"a": 1}"#;
1083        let err = ReaderBuilder::new(schema.clone())
1084            .with_batch_size(1024)
1085            .build(Cursor::new(buf.as_bytes()))
1086            .unwrap()
1087            .read()
1088            .unwrap_err();
1089
1090        assert_eq!(
1091            err.to_string(),
1092            "Json error: whilst decoding field 'a': expected string got 1"
1093        );
1094
1095        let buf = r#"{"a": true}"#;
1096        let err = ReaderBuilder::new(schema)
1097            .with_batch_size(1024)
1098            .build(Cursor::new(buf.as_bytes()))
1099            .unwrap()
1100            .read()
1101            .unwrap_err();
1102
1103        assert_eq!(
1104            err.to_string(),
1105            "Json error: whilst decoding field 'a': expected string got true"
1106        );
1107    }
1108
1109    #[test]
1110    fn test_coercing_primitive_into_string() {
1111        let buf = r#"
1112        {"a": 1, "b": 2, "c": true}
1113        {"a": 2E0, "b": 4, "c": false}
1114
1115        {"b": 6, "a": 2.0}
1116        {"b": "5", "a": 2}
1117        {"b": 4e0}
1118        {"b": 7, "a": null}
1119        "#;
1120
1121        let schema = Arc::new(Schema::new(vec![
1122            Field::new("a", DataType::Utf8, true),
1123            Field::new("b", DataType::Utf8, true),
1124            Field::new("c", DataType::Utf8, true),
1125        ]));
1126
1127        let batches = do_read(buf, 1024, true, false, schema);
1128        assert_eq!(batches.len(), 1);
1129
1130        let col1 = batches[0].column(0).as_string::<i32>();
1131        assert_eq!(col1.null_count(), 2);
1132        assert_eq!(col1.value(0), "1");
1133        assert_eq!(col1.value(1), "2E0");
1134        assert_eq!(col1.value(2), "2.0");
1135        assert_eq!(col1.value(3), "2");
1136        assert!(col1.is_null(4));
1137        assert!(col1.is_null(5));
1138
1139        let col2 = batches[0].column(1).as_string::<i32>();
1140        assert_eq!(col2.null_count(), 0);
1141        assert_eq!(col2.value(0), "2");
1142        assert_eq!(col2.value(1), "4");
1143        assert_eq!(col2.value(2), "6");
1144        assert_eq!(col2.value(3), "5");
1145        assert_eq!(col2.value(4), "4e0");
1146        assert_eq!(col2.value(5), "7");
1147
1148        let col3 = batches[0].column(2).as_string::<i32>();
1149        assert_eq!(col3.null_count(), 4);
1150        assert_eq!(col3.value(0), "true");
1151        assert_eq!(col3.value(1), "false");
1152        assert!(col3.is_null(2));
1153        assert!(col3.is_null(3));
1154        assert!(col3.is_null(4));
1155        assert!(col3.is_null(5));
1156    }
1157
1158    fn test_decimal<T: DecimalType>(data_type: DataType) {
1159        let buf = r#"
1160        {"a": 1, "b": 2, "c": 38.30}
1161        {"a": 2, "b": 4, "c": 123.456}
1162
1163        {"b": 1337, "a": "2.0452"}
1164        {"b": "5", "a": "11034.2"}
1165        {"b": 40}
1166        {"b": 1234, "a": null}
1167        "#;
1168
1169        let schema = Arc::new(Schema::new(vec![
1170            Field::new("a", data_type.clone(), true),
1171            Field::new("b", data_type.clone(), true),
1172            Field::new("c", data_type, true),
1173        ]));
1174
1175        let batches = do_read(buf, 1024, true, false, schema);
1176        assert_eq!(batches.len(), 1);
1177
1178        let col1 = batches[0].column(0).as_primitive::<T>();
1179        assert_eq!(col1.null_count(), 2);
1180        assert!(col1.is_null(4));
1181        assert!(col1.is_null(5));
1182        assert_eq!(
1183            col1.values(),
1184            &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1185        );
1186
1187        let col2 = batches[0].column(1).as_primitive::<T>();
1188        assert_eq!(col2.null_count(), 0);
1189        assert_eq!(
1190            col2.values(),
1191            &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1192        );
1193
1194        let col3 = batches[0].column(2).as_primitive::<T>();
1195        assert_eq!(col3.null_count(), 4);
1196        assert!(!col3.is_null(0));
1197        assert!(!col3.is_null(1));
1198        assert!(col3.is_null(2));
1199        assert!(col3.is_null(3));
1200        assert!(col3.is_null(4));
1201        assert!(col3.is_null(5));
1202        assert_eq!(
1203            col3.values(),
1204            &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1205        );
1206    }
1207
1208    #[test]
1209    fn test_decimals() {
1210        test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1211        test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1212    }
1213
1214    fn test_timestamp<T: ArrowTimestampType>() {
1215        let buf = r#"
1216        {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1217        {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1218
1219        {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1220        {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1221        {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1222        {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1223        "#;
1224
1225        let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1226        let schema = Arc::new(Schema::new(vec![
1227            Field::new("a", T::DATA_TYPE, true),
1228            Field::new("b", T::DATA_TYPE, true),
1229            Field::new("c", T::DATA_TYPE, true),
1230            Field::new("d", with_timezone, true),
1231        ]));
1232
1233        let batches = do_read(buf, 1024, true, false, schema);
1234        assert_eq!(batches.len(), 1);
1235
1236        let unit_in_nanos: i64 = match T::UNIT {
1237            TimeUnit::Second => 1_000_000_000,
1238            TimeUnit::Millisecond => 1_000_000,
1239            TimeUnit::Microsecond => 1_000,
1240            TimeUnit::Nanosecond => 1,
1241        };
1242
1243        let col1 = batches[0].column(0).as_primitive::<T>();
1244        assert_eq!(col1.null_count(), 4);
1245        assert!(col1.is_null(2));
1246        assert!(col1.is_null(3));
1247        assert!(col1.is_null(4));
1248        assert!(col1.is_null(5));
1249        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1250
1251        let col2 = batches[0].column(1).as_primitive::<T>();
1252        assert_eq!(col2.null_count(), 1);
1253        assert!(col2.is_null(5));
1254        assert_eq!(
1255            col2.values(),
1256            &[
1257                1599572549190855000 / unit_in_nanos,
1258                1599572549190855000 / unit_in_nanos,
1259                1599572549000000000 / unit_in_nanos,
1260                40,
1261                1234,
1262                0
1263            ]
1264        );
1265
1266        let col3 = batches[0].column(2).as_primitive::<T>();
1267        assert_eq!(col3.null_count(), 0);
1268        assert_eq!(
1269            col3.values(),
1270            &[
1271                38,
1272                123,
1273                854702816123000000 / unit_in_nanos,
1274                1599572549190855000 / unit_in_nanos,
1275                854702816123000000 / unit_in_nanos,
1276                854738816123000000 / unit_in_nanos
1277            ]
1278        );
1279
1280        let col4 = batches[0].column(3).as_primitive::<T>();
1281
1282        assert_eq!(col4.null_count(), 0);
1283        assert_eq!(
1284            col4.values(),
1285            &[
1286                854674016123000000 / unit_in_nanos,
1287                123,
1288                854702816123000000 / unit_in_nanos,
1289                854720816123000000 / unit_in_nanos,
1290                854674016000000000 / unit_in_nanos,
1291                854640000000000000 / unit_in_nanos
1292            ]
1293        );
1294    }
1295
1296    #[test]
1297    fn test_timestamps() {
1298        test_timestamp::<TimestampSecondType>();
1299        test_timestamp::<TimestampMillisecondType>();
1300        test_timestamp::<TimestampMicrosecondType>();
1301        test_timestamp::<TimestampNanosecondType>();
1302    }
1303
1304    fn test_time<T: ArrowTemporalType>() {
1305        let buf = r#"
1306        {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1307        {"a": 2, "b": "23:59:59", "c": 123.456}
1308
1309        {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1310        {"b": 40, "c": "13:42:29.190855"}
1311        {"b": 1234, "a": null, "c": "09:26:56.123"}
1312        {"c": "14:26:56.123"}
1313        "#;
1314
1315        let unit = match T::DATA_TYPE {
1316            DataType::Time32(unit) | DataType::Time64(unit) => unit,
1317            _ => unreachable!(),
1318        };
1319
1320        let unit_in_nanos = match unit {
1321            TimeUnit::Second => 1_000_000_000,
1322            TimeUnit::Millisecond => 1_000_000,
1323            TimeUnit::Microsecond => 1_000,
1324            TimeUnit::Nanosecond => 1,
1325        };
1326
1327        let schema = Arc::new(Schema::new(vec![
1328            Field::new("a", T::DATA_TYPE, true),
1329            Field::new("b", T::DATA_TYPE, true),
1330            Field::new("c", T::DATA_TYPE, true),
1331        ]));
1332
1333        let batches = do_read(buf, 1024, true, false, schema);
1334        assert_eq!(batches.len(), 1);
1335
1336        let col1 = batches[0].column(0).as_primitive::<T>();
1337        assert_eq!(col1.null_count(), 4);
1338        assert!(col1.is_null(2));
1339        assert!(col1.is_null(3));
1340        assert!(col1.is_null(4));
1341        assert!(col1.is_null(5));
1342        assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1343
1344        let col2 = batches[0].column(1).as_primitive::<T>();
1345        assert_eq!(col2.null_count(), 1);
1346        assert!(col2.is_null(5));
1347        assert_eq!(
1348            col2.values(),
1349            &[
1350                34016123000000 / unit_in_nanos,
1351                86399000000000 / unit_in_nanos,
1352                64800000000000 / unit_in_nanos,
1353                40,
1354                1234,
1355                0
1356            ]
1357            .map(T::Native::usize_as)
1358        );
1359
1360        let col3 = batches[0].column(2).as_primitive::<T>();
1361        assert_eq!(col3.null_count(), 0);
1362        assert_eq!(
1363            col3.values(),
1364            &[
1365                38,
1366                123,
1367                34016123000000 / unit_in_nanos,
1368                49349190855000 / unit_in_nanos,
1369                34016123000000 / unit_in_nanos,
1370                52016123000000 / unit_in_nanos
1371            ]
1372            .map(T::Native::usize_as)
1373        );
1374    }
1375
1376    #[test]
1377    fn test_times() {
1378        test_time::<Time32MillisecondType>();
1379        test_time::<Time32SecondType>();
1380        test_time::<Time64MicrosecondType>();
1381        test_time::<Time64NanosecondType>();
1382    }
1383
1384    fn test_duration<T: ArrowTemporalType>() {
1385        let buf = r#"
1386        {"a": 1, "b": "2"}
1387        {"a": 3, "b": null}
1388        "#;
1389
1390        let schema = Arc::new(Schema::new(vec![
1391            Field::new("a", T::DATA_TYPE, true),
1392            Field::new("b", T::DATA_TYPE, true),
1393        ]));
1394
1395        let batches = do_read(buf, 1024, true, false, schema);
1396        assert_eq!(batches.len(), 1);
1397
1398        let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1399        assert_eq!(col_a.null_count(), 0);
1400        assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1401
1402        let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1403        assert_eq!(col2.null_count(), 1);
1404        assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1405    }
1406
1407    #[test]
1408    fn test_durations() {
1409        test_duration::<DurationNanosecondType>();
1410        test_duration::<DurationMicrosecondType>();
1411        test_duration::<DurationMillisecondType>();
1412        test_duration::<DurationSecondType>();
1413    }
1414
1415    #[test]
1416    fn test_delta_checkpoint() {
1417        let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1418        let schema = Arc::new(Schema::new(vec![
1419            Field::new_struct(
1420                "protocol",
1421                vec![
1422                    Field::new("minReaderVersion", DataType::Int32, true),
1423                    Field::new("minWriterVersion", DataType::Int32, true),
1424                ],
1425                true,
1426            ),
1427            Field::new_struct(
1428                "add",
1429                vec![Field::new_map(
1430                    "partitionValues",
1431                    "key_value",
1432                    Field::new("key", DataType::Utf8, false),
1433                    Field::new("value", DataType::Utf8, true),
1434                    false,
1435                    false,
1436                )],
1437                true,
1438            ),
1439        ]));
1440
1441        let batches = do_read(json, 1024, true, false, schema);
1442        assert_eq!(batches.len(), 1);
1443
1444        let s: StructArray = batches.into_iter().next().unwrap().into();
1445        let opts = FormatOptions::default().with_null("null");
1446        let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1447        assert_eq!(
1448            formatter.value(0).to_string(),
1449            "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1450        );
1451    }
1452
1453    #[test]
1454    fn struct_nullability() {
1455        let do_test = |child: DataType| {
1456            // Test correctly enforced nullability
1457            let non_null = r#"{"foo": {}}"#;
1458            let schema = Arc::new(Schema::new(vec![Field::new_struct(
1459                "foo",
1460                vec![Field::new("bar", child, false)],
1461                true,
1462            )]));
1463            let mut reader = ReaderBuilder::new(schema.clone())
1464                .build(Cursor::new(non_null.as_bytes()))
1465                .unwrap();
1466            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1467
1468            let null = r#"{"foo": {bar: null}}"#;
1469            let mut reader = ReaderBuilder::new(schema.clone())
1470                .build(Cursor::new(null.as_bytes()))
1471                .unwrap();
1472            assert!(reader.next().unwrap().is_err()); // Should error as not nullable
1473
1474            // Test nulls in nullable parent can mask nulls in non-nullable child
1475            let null = r#"{"foo": null}"#;
1476            let mut reader = ReaderBuilder::new(schema)
1477                .build(Cursor::new(null.as_bytes()))
1478                .unwrap();
1479            let batch = reader.next().unwrap().unwrap();
1480            assert_eq!(batch.num_columns(), 1);
1481            let foo = batch.column(0).as_struct();
1482            assert_eq!(foo.len(), 1);
1483            assert!(foo.is_null(0));
1484            assert_eq!(foo.num_columns(), 1);
1485
1486            let bar = foo.column(0);
1487            assert_eq!(bar.len(), 1);
1488            // Non-nullable child can still contain null as masked by parent
1489            assert!(bar.is_null(0));
1490        };
1491
1492        do_test(DataType::Boolean);
1493        do_test(DataType::Int32);
1494        do_test(DataType::Utf8);
1495        do_test(DataType::Decimal128(2, 1));
1496        do_test(DataType::Timestamp(
1497            TimeUnit::Microsecond,
1498            Some("+00:00".into()),
1499        ));
1500    }
1501
1502    #[test]
1503    fn test_truncation() {
1504        let buf = r#"
1505        {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1506        {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1507        {"i64": -9223372036854775808, "u64": 0 }
1508        {"i64": "-9223372036854775808", "u64": 0 }
1509        "#;
1510
1511        let schema = Arc::new(Schema::new(vec![
1512            Field::new("i64", DataType::Int64, true),
1513            Field::new("u64", DataType::UInt64, true),
1514        ]));
1515
1516        let batches = do_read(buf, 1024, true, false, schema);
1517        assert_eq!(batches.len(), 1);
1518
1519        let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1520        assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1521
1522        let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1523        assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1524    }
1525
1526    #[test]
1527    fn test_timestamp_truncation() {
1528        let buf = r#"
1529        {"time": 9223372036854775807 }
1530        {"time": -9223372036854775808 }
1531        {"time": 9e5 }
1532        "#;
1533
1534        let schema = Arc::new(Schema::new(vec![Field::new(
1535            "time",
1536            DataType::Timestamp(TimeUnit::Nanosecond, None),
1537            true,
1538        )]));
1539
1540        let batches = do_read(buf, 1024, true, false, schema);
1541        assert_eq!(batches.len(), 1);
1542
1543        let i64 = batches[0]
1544            .column(0)
1545            .as_primitive::<TimestampNanosecondType>();
1546        assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1547    }
1548
1549    #[test]
1550    fn test_strict_mode_no_missing_columns_in_schema() {
1551        let buf = r#"
1552        {"a": 1, "b": "2", "c": true}
1553        {"a": 2E0, "b": "4", "c": false}
1554        "#;
1555
1556        let schema = Arc::new(Schema::new(vec![
1557            Field::new("a", DataType::Int16, false),
1558            Field::new("b", DataType::Utf8, false),
1559            Field::new("c", DataType::Boolean, false),
1560        ]));
1561
1562        let batches = do_read(buf, 1024, true, true, schema);
1563        assert_eq!(batches.len(), 1);
1564
1565        let buf = r#"
1566        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1567        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1568        "#;
1569
1570        let schema = Arc::new(Schema::new(vec![
1571            Field::new("a", DataType::Int16, false),
1572            Field::new("b", DataType::Utf8, false),
1573            Field::new_struct(
1574                "c",
1575                vec![
1576                    Field::new("a", DataType::Boolean, false),
1577                    Field::new("b", DataType::Int16, false),
1578                ],
1579                false,
1580            ),
1581        ]));
1582
1583        let batches = do_read(buf, 1024, true, true, schema);
1584        assert_eq!(batches.len(), 1);
1585    }
1586
1587    #[test]
1588    fn test_strict_mode_missing_columns_in_schema() {
1589        let buf = r#"
1590        {"a": 1, "b": "2", "c": true}
1591        {"a": 2E0, "b": "4", "c": false}
1592        "#;
1593
1594        let schema = Arc::new(Schema::new(vec![
1595            Field::new("a", DataType::Int16, true),
1596            Field::new("c", DataType::Boolean, true),
1597        ]));
1598
1599        let err = ReaderBuilder::new(schema)
1600            .with_batch_size(1024)
1601            .with_strict_mode(true)
1602            .build(Cursor::new(buf.as_bytes()))
1603            .unwrap()
1604            .read()
1605            .unwrap_err();
1606
1607        assert_eq!(
1608            err.to_string(),
1609            "Json error: column 'b' missing from schema"
1610        );
1611
1612        let buf = r#"
1613        {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1614        {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1615        "#;
1616
1617        let schema = Arc::new(Schema::new(vec![
1618            Field::new("a", DataType::Int16, false),
1619            Field::new("b", DataType::Utf8, false),
1620            Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1621        ]));
1622
1623        let err = ReaderBuilder::new(schema)
1624            .with_batch_size(1024)
1625            .with_strict_mode(true)
1626            .build(Cursor::new(buf.as_bytes()))
1627            .unwrap()
1628            .read()
1629            .unwrap_err();
1630
1631        assert_eq!(
1632            err.to_string(),
1633            "Json error: whilst decoding field 'c': column 'b' missing from schema"
1634        );
1635    }
1636
1637    fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1638        let file = File::open(path).unwrap();
1639        let mut reader = BufReader::new(file);
1640        let schema = schema.unwrap_or_else(|| {
1641            let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1642            reader.rewind().unwrap();
1643            schema
1644        });
1645        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1646        builder.build(reader).unwrap()
1647    }
1648
1649    #[test]
1650    fn test_json_basic() {
1651        let mut reader = read_file("test/data/basic.json", None);
1652        let batch = reader.next().unwrap().unwrap();
1653
1654        assert_eq!(8, batch.num_columns());
1655        assert_eq!(12, batch.num_rows());
1656
1657        let schema = reader.schema();
1658        let batch_schema = batch.schema();
1659        assert_eq!(schema, batch_schema);
1660
1661        let a = schema.column_with_name("a").unwrap();
1662        assert_eq!(0, a.0);
1663        assert_eq!(&DataType::Int64, a.1.data_type());
1664        let b = schema.column_with_name("b").unwrap();
1665        assert_eq!(1, b.0);
1666        assert_eq!(&DataType::Float64, b.1.data_type());
1667        let c = schema.column_with_name("c").unwrap();
1668        assert_eq!(2, c.0);
1669        assert_eq!(&DataType::Boolean, c.1.data_type());
1670        let d = schema.column_with_name("d").unwrap();
1671        assert_eq!(3, d.0);
1672        assert_eq!(&DataType::Utf8, d.1.data_type());
1673
1674        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1675        assert_eq!(1, aa.value(0));
1676        assert_eq!(-10, aa.value(1));
1677        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1678        assert_eq!(2.0, bb.value(0));
1679        assert_eq!(-3.5, bb.value(1));
1680        let cc = batch.column(c.0).as_boolean();
1681        assert!(!cc.value(0));
1682        assert!(cc.value(10));
1683        let dd = batch.column(d.0).as_string::<i32>();
1684        assert_eq!("4", dd.value(0));
1685        assert_eq!("text", dd.value(8));
1686    }
1687
1688    #[test]
1689    fn test_json_empty_projection() {
1690        let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1691        let batch = reader.next().unwrap().unwrap();
1692
1693        assert_eq!(0, batch.num_columns());
1694        assert_eq!(12, batch.num_rows());
1695    }
1696
1697    #[test]
1698    fn test_json_basic_with_nulls() {
1699        let mut reader = read_file("test/data/basic_nulls.json", None);
1700        let batch = reader.next().unwrap().unwrap();
1701
1702        assert_eq!(4, batch.num_columns());
1703        assert_eq!(12, batch.num_rows());
1704
1705        let schema = reader.schema();
1706        let batch_schema = batch.schema();
1707        assert_eq!(schema, batch_schema);
1708
1709        let a = schema.column_with_name("a").unwrap();
1710        assert_eq!(&DataType::Int64, a.1.data_type());
1711        let b = schema.column_with_name("b").unwrap();
1712        assert_eq!(&DataType::Float64, b.1.data_type());
1713        let c = schema.column_with_name("c").unwrap();
1714        assert_eq!(&DataType::Boolean, c.1.data_type());
1715        let d = schema.column_with_name("d").unwrap();
1716        assert_eq!(&DataType::Utf8, d.1.data_type());
1717
1718        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1719        assert!(aa.is_valid(0));
1720        assert!(!aa.is_valid(1));
1721        assert!(!aa.is_valid(11));
1722        let bb = batch.column(b.0).as_primitive::<Float64Type>();
1723        assert!(bb.is_valid(0));
1724        assert!(!bb.is_valid(2));
1725        assert!(!bb.is_valid(11));
1726        let cc = batch.column(c.0).as_boolean();
1727        assert!(cc.is_valid(0));
1728        assert!(!cc.is_valid(4));
1729        assert!(!cc.is_valid(11));
1730        let dd = batch.column(d.0).as_string::<i32>();
1731        assert!(!dd.is_valid(0));
1732        assert!(dd.is_valid(1));
1733        assert!(!dd.is_valid(4));
1734        assert!(!dd.is_valid(11));
1735    }
1736
1737    #[test]
1738    fn test_json_basic_schema() {
1739        let schema = Schema::new(vec![
1740            Field::new("a", DataType::Int64, true),
1741            Field::new("b", DataType::Float32, false),
1742            Field::new("c", DataType::Boolean, false),
1743            Field::new("d", DataType::Utf8, false),
1744        ]);
1745
1746        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1747        let reader_schema = reader.schema();
1748        assert_eq!(reader_schema.as_ref(), &schema);
1749        let batch = reader.next().unwrap().unwrap();
1750
1751        assert_eq!(4, batch.num_columns());
1752        assert_eq!(12, batch.num_rows());
1753
1754        let schema = batch.schema();
1755
1756        let a = schema.column_with_name("a").unwrap();
1757        assert_eq!(&DataType::Int64, a.1.data_type());
1758        let b = schema.column_with_name("b").unwrap();
1759        assert_eq!(&DataType::Float32, b.1.data_type());
1760        let c = schema.column_with_name("c").unwrap();
1761        assert_eq!(&DataType::Boolean, c.1.data_type());
1762        let d = schema.column_with_name("d").unwrap();
1763        assert_eq!(&DataType::Utf8, d.1.data_type());
1764
1765        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1766        assert_eq!(1, aa.value(0));
1767        assert_eq!(100000000000000, aa.value(11));
1768        let bb = batch.column(b.0).as_primitive::<Float32Type>();
1769        assert_eq!(2.0, bb.value(0));
1770        assert_eq!(-3.5, bb.value(1));
1771    }
1772
1773    #[test]
1774    fn test_json_basic_schema_projection() {
1775        let schema = Schema::new(vec![
1776            Field::new("a", DataType::Int64, true),
1777            Field::new("c", DataType::Boolean, false),
1778        ]);
1779
1780        let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1781        let batch = reader.next().unwrap().unwrap();
1782
1783        assert_eq!(2, batch.num_columns());
1784        assert_eq!(2, batch.schema().fields().len());
1785        assert_eq!(12, batch.num_rows());
1786
1787        assert_eq!(batch.schema().as_ref(), &schema);
1788
1789        let a = schema.column_with_name("a").unwrap();
1790        assert_eq!(0, a.0);
1791        assert_eq!(&DataType::Int64, a.1.data_type());
1792        let c = schema.column_with_name("c").unwrap();
1793        assert_eq!(1, c.0);
1794        assert_eq!(&DataType::Boolean, c.1.data_type());
1795    }
1796
1797    #[test]
1798    fn test_json_arrays() {
1799        let mut reader = read_file("test/data/arrays.json", None);
1800        let batch = reader.next().unwrap().unwrap();
1801
1802        assert_eq!(4, batch.num_columns());
1803        assert_eq!(3, batch.num_rows());
1804
1805        let schema = batch.schema();
1806
1807        let a = schema.column_with_name("a").unwrap();
1808        assert_eq!(&DataType::Int64, a.1.data_type());
1809        let b = schema.column_with_name("b").unwrap();
1810        assert_eq!(
1811            &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
1812            b.1.data_type()
1813        );
1814        let c = schema.column_with_name("c").unwrap();
1815        assert_eq!(
1816            &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
1817            c.1.data_type()
1818        );
1819        let d = schema.column_with_name("d").unwrap();
1820        assert_eq!(&DataType::Utf8, d.1.data_type());
1821
1822        let aa = batch.column(a.0).as_primitive::<Int64Type>();
1823        assert_eq!(1, aa.value(0));
1824        assert_eq!(-10, aa.value(1));
1825        assert_eq!(1627668684594000000, aa.value(2));
1826        let bb = batch.column(b.0).as_list::<i32>();
1827        let bb = bb.values().as_primitive::<Float64Type>();
1828        assert_eq!(9, bb.len());
1829        assert_eq!(2.0, bb.value(0));
1830        assert_eq!(-6.1, bb.value(5));
1831        assert!(!bb.is_valid(7));
1832
1833        let cc = batch
1834            .column(c.0)
1835            .as_any()
1836            .downcast_ref::<ListArray>()
1837            .unwrap();
1838        let cc = cc.values().as_boolean();
1839        assert_eq!(6, cc.len());
1840        assert!(!cc.value(0));
1841        assert!(!cc.value(4));
1842        assert!(!cc.is_valid(5));
1843    }
1844
1845    #[test]
1846    fn test_empty_json_arrays() {
1847        let json_content = r#"
1848            {"items": []}
1849            {"items": null}
1850            {}
1851            "#;
1852
1853        let schema = Arc::new(Schema::new(vec![Field::new(
1854            "items",
1855            DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
1856            true,
1857        )]));
1858
1859        let batches = do_read(json_content, 1024, false, false, schema);
1860        assert_eq!(batches.len(), 1);
1861
1862        let col1 = batches[0].column(0).as_list::<i32>();
1863        assert_eq!(col1.null_count(), 2);
1864        assert!(col1.value(0).is_empty());
1865        assert_eq!(col1.value(0).data_type(), &DataType::Null);
1866        assert!(col1.is_null(1));
1867        assert!(col1.is_null(2));
1868    }
1869
1870    #[test]
1871    fn test_nested_empty_json_arrays() {
1872        let json_content = r#"
1873            {"items": [[],[]]}
1874            {"items": [[null, null],[null]]}
1875            "#;
1876
1877        let schema = Arc::new(Schema::new(vec![Field::new(
1878            "items",
1879            DataType::List(FieldRef::new(Field::new_list_field(
1880                DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
1881                true,
1882            ))),
1883            true,
1884        )]));
1885
1886        let batches = do_read(json_content, 1024, false, false, schema);
1887        assert_eq!(batches.len(), 1);
1888
1889        let col1 = batches[0].column(0).as_list::<i32>();
1890        assert_eq!(col1.null_count(), 0);
1891        assert_eq!(col1.value(0).len(), 2);
1892        assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
1893        assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
1894
1895        assert_eq!(col1.value(1).len(), 2);
1896        assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
1897        assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
1898    }
1899
1900    #[test]
1901    fn test_nested_list_json_arrays() {
1902        let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
1903        let a_struct_field = Field::new_struct(
1904            "a",
1905            vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
1906            true,
1907        );
1908        let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
1909        let schema = Arc::new(Schema::new(vec![a_field.clone()]));
1910        let builder = ReaderBuilder::new(schema).with_batch_size(64);
1911        let json_content = r#"
1912        {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
1913        {"a": [{"b": false, "c": null}]}
1914        {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
1915        {"a": null}
1916        {"a": []}
1917        {"a": [null]}
1918        "#;
1919        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
1920
1921        // build expected output
1922        let d = StringArray::from(vec![
1923            Some("a_text"),
1924            Some("b_text"),
1925            None,
1926            Some("c_text"),
1927            Some("d_text"),
1928            None,
1929            None,
1930        ]);
1931        let c = ArrayDataBuilder::new(c_field.data_type().clone())
1932            .len(7)
1933            .add_child_data(d.to_data())
1934            .null_bit_buffer(Some(Buffer::from([0b00111011])))
1935            .build()
1936            .unwrap();
1937        let b = BooleanArray::from(vec![
1938            Some(true),
1939            Some(false),
1940            Some(false),
1941            Some(true),
1942            None,
1943            Some(true),
1944            None,
1945        ]);
1946        let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
1947            .len(7)
1948            .add_child_data(b.to_data())
1949            .add_child_data(c.clone())
1950            .null_bit_buffer(Some(Buffer::from([0b00111111])))
1951            .build()
1952            .unwrap();
1953        let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
1954            .len(6)
1955            .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
1956            .add_child_data(a)
1957            .null_bit_buffer(Some(Buffer::from([0b00110111])))
1958            .build()
1959            .unwrap();
1960        let expected = make_array(a_list);
1961
1962        // compare `a` with result from json reader
1963        let batch = reader.next().unwrap().unwrap();
1964        let read = batch.column(0);
1965        assert_eq!(read.len(), 6);
1966        // compare the arrays the long way around, to better detect differences
1967        let read: &ListArray = read.as_list::<i32>();
1968        let expected = expected.as_list::<i32>();
1969        assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
1970        // compare list null buffers
1971        assert_eq!(read.nulls(), expected.nulls());
1972        // build struct from list
1973        let struct_array = read.values().as_struct();
1974        let expected_struct_array = expected.values().as_struct();
1975
1976        assert_eq!(7, struct_array.len());
1977        assert_eq!(1, struct_array.null_count());
1978        assert_eq!(7, expected_struct_array.len());
1979        assert_eq!(1, expected_struct_array.null_count());
1980        // test struct's nulls
1981        assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
1982        // test struct's fields
1983        let read_b = struct_array.column(0);
1984        assert_eq!(read_b.as_ref(), &b);
1985        let read_c = struct_array.column(1);
1986        assert_eq!(read_c.to_data(), c);
1987        let read_c = read_c.as_struct();
1988        let read_d = read_c.column(0);
1989        assert_eq!(read_d.as_ref(), &d);
1990
1991        assert_eq!(read, expected);
1992    }
1993
1994    #[test]
1995    fn test_skip_empty_lines() {
1996        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
1997        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1998        let json_content = "
1999        {\"a\": 1}
2000        {\"a\": 2}
2001        {\"a\": 3}";
2002        let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2003        let batch = reader.next().unwrap().unwrap();
2004
2005        assert_eq!(1, batch.num_columns());
2006        assert_eq!(3, batch.num_rows());
2007
2008        let schema = reader.schema();
2009        let c = schema.column_with_name("a").unwrap();
2010        assert_eq!(&DataType::Int64, c.1.data_type());
2011    }
2012
2013    #[test]
2014    fn test_with_multiple_batches() {
2015        let file = File::open("test/data/basic_nulls.json").unwrap();
2016        let mut reader = BufReader::new(file);
2017        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2018        reader.rewind().unwrap();
2019
2020        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2021        let mut reader = builder.build(reader).unwrap();
2022
2023        let mut num_records = Vec::new();
2024        while let Some(rb) = reader.next().transpose().unwrap() {
2025            num_records.push(rb.num_rows());
2026        }
2027
2028        assert_eq!(vec![5, 5, 2], num_records);
2029    }
2030
2031    #[test]
2032    fn test_timestamp_from_json_seconds() {
2033        let schema = Schema::new(vec![Field::new(
2034            "a",
2035            DataType::Timestamp(TimeUnit::Second, None),
2036            true,
2037        )]);
2038
2039        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2040        let batch = reader.next().unwrap().unwrap();
2041
2042        assert_eq!(1, batch.num_columns());
2043        assert_eq!(12, batch.num_rows());
2044
2045        let schema = reader.schema();
2046        let batch_schema = batch.schema();
2047        assert_eq!(schema, batch_schema);
2048
2049        let a = schema.column_with_name("a").unwrap();
2050        assert_eq!(
2051            &DataType::Timestamp(TimeUnit::Second, None),
2052            a.1.data_type()
2053        );
2054
2055        let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2056        assert!(aa.is_valid(0));
2057        assert!(!aa.is_valid(1));
2058        assert!(!aa.is_valid(2));
2059        assert_eq!(1, aa.value(0));
2060        assert_eq!(1, aa.value(3));
2061        assert_eq!(5, aa.value(7));
2062    }
2063
2064    #[test]
2065    fn test_timestamp_from_json_milliseconds() {
2066        let schema = Schema::new(vec![Field::new(
2067            "a",
2068            DataType::Timestamp(TimeUnit::Millisecond, None),
2069            true,
2070        )]);
2071
2072        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2073        let batch = reader.next().unwrap().unwrap();
2074
2075        assert_eq!(1, batch.num_columns());
2076        assert_eq!(12, batch.num_rows());
2077
2078        let schema = reader.schema();
2079        let batch_schema = batch.schema();
2080        assert_eq!(schema, batch_schema);
2081
2082        let a = schema.column_with_name("a").unwrap();
2083        assert_eq!(
2084            &DataType::Timestamp(TimeUnit::Millisecond, None),
2085            a.1.data_type()
2086        );
2087
2088        let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2089        assert!(aa.is_valid(0));
2090        assert!(!aa.is_valid(1));
2091        assert!(!aa.is_valid(2));
2092        assert_eq!(1, aa.value(0));
2093        assert_eq!(1, aa.value(3));
2094        assert_eq!(5, aa.value(7));
2095    }
2096
2097    #[test]
2098    fn test_date_from_json_milliseconds() {
2099        let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2100
2101        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2102        let batch = reader.next().unwrap().unwrap();
2103
2104        assert_eq!(1, batch.num_columns());
2105        assert_eq!(12, batch.num_rows());
2106
2107        let schema = reader.schema();
2108        let batch_schema = batch.schema();
2109        assert_eq!(schema, batch_schema);
2110
2111        let a = schema.column_with_name("a").unwrap();
2112        assert_eq!(&DataType::Date64, a.1.data_type());
2113
2114        let aa = batch.column(a.0).as_primitive::<Date64Type>();
2115        assert!(aa.is_valid(0));
2116        assert!(!aa.is_valid(1));
2117        assert!(!aa.is_valid(2));
2118        assert_eq!(1, aa.value(0));
2119        assert_eq!(1, aa.value(3));
2120        assert_eq!(5, aa.value(7));
2121    }
2122
2123    #[test]
2124    fn test_time_from_json_nanoseconds() {
2125        let schema = Schema::new(vec![Field::new(
2126            "a",
2127            DataType::Time64(TimeUnit::Nanosecond),
2128            true,
2129        )]);
2130
2131        let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2132        let batch = reader.next().unwrap().unwrap();
2133
2134        assert_eq!(1, batch.num_columns());
2135        assert_eq!(12, batch.num_rows());
2136
2137        let schema = reader.schema();
2138        let batch_schema = batch.schema();
2139        assert_eq!(schema, batch_schema);
2140
2141        let a = schema.column_with_name("a").unwrap();
2142        assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2143
2144        let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2145        assert!(aa.is_valid(0));
2146        assert!(!aa.is_valid(1));
2147        assert!(!aa.is_valid(2));
2148        assert_eq!(1, aa.value(0));
2149        assert_eq!(1, aa.value(3));
2150        assert_eq!(5, aa.value(7));
2151    }
2152
2153    #[test]
2154    fn test_json_iterator() {
2155        let file = File::open("test/data/basic.json").unwrap();
2156        let mut reader = BufReader::new(file);
2157        let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2158        reader.rewind().unwrap();
2159
2160        let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2161        let reader = builder.build(reader).unwrap();
2162        let schema = reader.schema();
2163        let (col_a_index, _) = schema.column_with_name("a").unwrap();
2164
2165        let mut sum_num_rows = 0;
2166        let mut num_batches = 0;
2167        let mut sum_a = 0;
2168        for batch in reader {
2169            let batch = batch.unwrap();
2170            assert_eq!(8, batch.num_columns());
2171            sum_num_rows += batch.num_rows();
2172            num_batches += 1;
2173            let batch_schema = batch.schema();
2174            assert_eq!(schema, batch_schema);
2175            let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2176            sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2177        }
2178        assert_eq!(12, sum_num_rows);
2179        assert_eq!(3, num_batches);
2180        assert_eq!(100000000000011, sum_a);
2181    }
2182
2183    #[test]
2184    fn test_decoder_error() {
2185        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2186            "a",
2187            vec![Field::new("child", DataType::Int32, false)],
2188            true,
2189        )]));
2190
2191        let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2192        let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2193        assert!(decoder.tape_decoder.has_partial_row());
2194        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2195        let _ = decoder.flush().unwrap_err();
2196        assert!(decoder.tape_decoder.has_partial_row());
2197        assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2198
2199        let parse_err = |s: &str| {
2200            ReaderBuilder::new(schema.clone())
2201                .build(Cursor::new(s.as_bytes()))
2202                .unwrap()
2203                .next()
2204                .unwrap()
2205                .unwrap_err()
2206                .to_string()
2207        };
2208
2209        let err = parse_err(r#"{"a": 123}"#);
2210        assert_eq!(
2211            err,
2212            "Json error: whilst decoding field 'a': expected { got 123"
2213        );
2214
2215        let err = parse_err(r#"{"a": ["bar"]}"#);
2216        assert_eq!(
2217            err,
2218            r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2219        );
2220
2221        let err = parse_err(r#"{"a": []}"#);
2222        assert_eq!(
2223            err,
2224            "Json error: whilst decoding field 'a': expected { got []"
2225        );
2226
2227        let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2228        assert_eq!(
2229            err,
2230            r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2231        );
2232
2233        let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2234        assert_eq!(
2235            err,
2236            r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2237        );
2238
2239        let err = parse_err(r#"{"a": true}"#);
2240        assert_eq!(
2241            err,
2242            "Json error: whilst decoding field 'a': expected { got true"
2243        );
2244
2245        let err = parse_err(r#"{"a": false}"#);
2246        assert_eq!(
2247            err,
2248            "Json error: whilst decoding field 'a': expected { got false"
2249        );
2250
2251        let err = parse_err(r#"{"a": "foo"}"#);
2252        assert_eq!(
2253            err,
2254            "Json error: whilst decoding field 'a': expected { got \"foo\""
2255        );
2256
2257        let err = parse_err(r#"{"a": {"child": false}}"#);
2258        assert_eq!(
2259            err,
2260            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2261        );
2262
2263        let err = parse_err(r#"{"a": {"child": []}}"#);
2264        assert_eq!(
2265            err,
2266            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2267        );
2268
2269        let err = parse_err(r#"{"a": {"child": [123]}}"#);
2270        assert_eq!(
2271            err,
2272            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2273        );
2274
2275        let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2276        assert_eq!(
2277            err,
2278            "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2279        );
2280    }
2281
2282    #[test]
2283    fn test_serialize_timestamp() {
2284        let json = vec![
2285            json!({"timestamp": 1681319393}),
2286            json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2287        ];
2288        let schema = Schema::new(vec![Field::new(
2289            "timestamp",
2290            DataType::Timestamp(TimeUnit::Second, None),
2291            true,
2292        )]);
2293        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2294            .build_decoder()
2295            .unwrap();
2296        decoder.serialize(&json).unwrap();
2297        let batch = decoder.flush().unwrap().unwrap();
2298        assert_eq!(batch.num_rows(), 2);
2299        assert_eq!(batch.num_columns(), 1);
2300        let values = batch.column(0).as_primitive::<TimestampSecondType>();
2301        assert_eq!(values.values(), &[1681319393, -7200]);
2302    }
2303
2304    #[test]
2305    fn test_serialize_decimal() {
2306        let json = vec![
2307            json!({"decimal": 1.234}),
2308            json!({"decimal": "1.234"}),
2309            json!({"decimal": 1234}),
2310            json!({"decimal": "1234"}),
2311        ];
2312        let schema = Schema::new(vec![Field::new(
2313            "decimal",
2314            DataType::Decimal128(10, 3),
2315            true,
2316        )]);
2317        let mut decoder = ReaderBuilder::new(Arc::new(schema))
2318            .build_decoder()
2319            .unwrap();
2320        decoder.serialize(&json).unwrap();
2321        let batch = decoder.flush().unwrap().unwrap();
2322        assert_eq!(batch.num_rows(), 4);
2323        assert_eq!(batch.num_columns(), 1);
2324        let values = batch.column(0).as_primitive::<Decimal128Type>();
2325        assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2326    }
2327
2328    #[test]
2329    fn test_serde_field() {
2330        let field = Field::new("int", DataType::Int32, true);
2331        let mut decoder = ReaderBuilder::new_with_field(field)
2332            .build_decoder()
2333            .unwrap();
2334        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2335        let b = decoder.flush().unwrap().unwrap();
2336        let values = b.column(0).as_primitive::<Int32Type>().values();
2337        assert_eq!(values, &[1, 2, 3, 4]);
2338    }
2339
2340    #[test]
2341    fn test_serde_large_numbers() {
2342        let field = Field::new("int", DataType::Int64, true);
2343        let mut decoder = ReaderBuilder::new_with_field(field)
2344            .build_decoder()
2345            .unwrap();
2346
2347        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2348        let b = decoder.flush().unwrap().unwrap();
2349        let values = b.column(0).as_primitive::<Int64Type>().values();
2350        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2351
2352        let field = Field::new(
2353            "int",
2354            DataType::Timestamp(TimeUnit::Microsecond, None),
2355            true,
2356        );
2357        let mut decoder = ReaderBuilder::new_with_field(field)
2358            .build_decoder()
2359            .unwrap();
2360
2361        decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2362        let b = decoder.flush().unwrap().unwrap();
2363        let values = b
2364            .column(0)
2365            .as_primitive::<TimestampMicrosecondType>()
2366            .values();
2367        assert_eq!(values, &[1699148028689, 2, 3, 4]);
2368    }
2369
2370    #[test]
2371    fn test_coercing_primitive_into_string_decoder() {
2372        let buf = &format!(
2373            r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2374            (i32::MAX as i64 + 10),
2375            i64::MAX - 10
2376        );
2377        let schema = Schema::new(vec![
2378            Field::new("a", DataType::Float64, true),
2379            Field::new("b", DataType::Utf8, true),
2380            Field::new("c", DataType::Utf8, true),
2381        ]);
2382        let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2383        let schema_ref = Arc::new(schema);
2384
2385        // read record batches
2386        let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2387        let mut decoder = reader.build_decoder().unwrap();
2388        decoder.serialize(json_array.as_slice()).unwrap();
2389        let batch = decoder.flush().unwrap().unwrap();
2390        assert_eq!(
2391            batch,
2392            RecordBatch::try_new(
2393                schema_ref,
2394                vec![
2395                    Arc::new(Float64Array::from(vec![
2396                        1.0,
2397                        2.0,
2398                        (i32::MAX as i64 + 10) as f64,
2399                        (i64::MAX - 10) as f64
2400                    ])),
2401                    Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2402                    Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2403                ]
2404            )
2405            .unwrap()
2406        );
2407    }
2408
2409    // Parse the given `row` in `struct_mode` as a type given by fields.
2410    //
2411    // If as_struct == true, wrap the fields in a Struct field with name "r".
2412    // If as_struct == false, wrap the fields in a Schema.
2413    fn _parse_structs(
2414        row: &str,
2415        struct_mode: StructMode,
2416        fields: Fields,
2417        as_struct: bool,
2418    ) -> Result<RecordBatch, ArrowError> {
2419        let builder = if as_struct {
2420            ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2421        } else {
2422            ReaderBuilder::new(Arc::new(Schema::new(fields)))
2423        };
2424        builder
2425            .with_struct_mode(struct_mode)
2426            .build(Cursor::new(row.as_bytes()))
2427            .unwrap()
2428            .next()
2429            .unwrap()
2430    }
2431
2432    #[test]
2433    fn test_struct_decoding_list_length() {
2434        use arrow_array::array;
2435
2436        let row = "[1, 2]";
2437
2438        let mut fields = vec![Field::new("a", DataType::Int32, true)];
2439        let too_few_fields = Fields::from(fields.clone());
2440        fields.push(Field::new("b", DataType::Int32, true));
2441        let correct_fields = Fields::from(fields.clone());
2442        fields.push(Field::new("c", DataType::Int32, true));
2443        let too_many_fields = Fields::from(fields.clone());
2444
2445        let parse = |fields: Fields, as_struct: bool| {
2446            _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2447        };
2448
2449        let expected_row = StructArray::new(
2450            correct_fields.clone(),
2451            vec![
2452                Arc::new(array::Int32Array::from(vec![1])),
2453                Arc::new(array::Int32Array::from(vec![2])),
2454            ],
2455            None,
2456        );
2457        let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2458
2459        assert_eq!(
2460            parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2461            "Json error: found extra columns for 1 fields".to_string()
2462        );
2463        assert_eq!(
2464            parse(too_few_fields, false).unwrap_err().to_string(),
2465            "Json error: found extra columns for 1 fields".to_string()
2466        );
2467        assert_eq!(
2468            parse(correct_fields.clone(), true).unwrap(),
2469            RecordBatch::try_new(
2470                Arc::new(Schema::new(vec![row_field])),
2471                vec![Arc::new(expected_row.clone())]
2472            )
2473            .unwrap()
2474        );
2475        assert_eq!(
2476            parse(correct_fields, false).unwrap(),
2477            RecordBatch::from(expected_row)
2478        );
2479        assert_eq!(
2480            parse(too_many_fields.clone(), true)
2481                .unwrap_err()
2482                .to_string(),
2483            "Json error: found 2 columns for 3 fields".to_string()
2484        );
2485        assert_eq!(
2486            parse(too_many_fields, false).unwrap_err().to_string(),
2487            "Json error: found 2 columns for 3 fields".to_string()
2488        );
2489    }
2490
2491    #[test]
2492    fn test_struct_decoding() {
2493        use arrow_array::builder;
2494
2495        let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2496        let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2497        let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2498
2499        let struct_fields = Fields::from(vec![
2500            Field::new("b", DataType::new_list(DataType::Int32, true), true),
2501            Field::new_map(
2502                "c",
2503                "entries",
2504                Field::new("keys", DataType::Utf8, false),
2505                Field::new("values", DataType::Int32, true),
2506                false,
2507                false,
2508            ),
2509        ]);
2510
2511        let list_array =
2512            ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2513
2514        let map_array = {
2515            let mut map_builder = builder::MapBuilder::new(
2516                None,
2517                builder::StringBuilder::new(),
2518                builder::Int32Builder::new(),
2519            );
2520            map_builder.keys().append_value("d");
2521            map_builder.values().append_value(3);
2522            map_builder.append(true).unwrap();
2523            map_builder.finish()
2524        };
2525
2526        let struct_array = StructArray::new(
2527            struct_fields.clone(),
2528            vec![Arc::new(list_array), Arc::new(map_array)],
2529            None,
2530        );
2531
2532        let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2533        let schema = Arc::new(Schema::new(fields.clone()));
2534        let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2535
2536        let parse = |row: &str, struct_mode: StructMode| {
2537            _parse_structs(row, struct_mode, fields.clone(), false)
2538        };
2539
2540        assert_eq!(
2541            parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2542            expected
2543        );
2544        assert_eq!(
2545            parse(nested_list_json, StructMode::ObjectOnly)
2546                .unwrap_err()
2547                .to_string(),
2548            "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2549        );
2550        assert_eq!(
2551            parse(nested_mixed_json, StructMode::ObjectOnly)
2552                .unwrap_err()
2553                .to_string(),
2554            "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2555        );
2556
2557        assert_eq!(
2558            parse(nested_list_json, StructMode::ListOnly).unwrap(),
2559            expected
2560        );
2561        assert_eq!(
2562            parse(nested_object_json, StructMode::ListOnly)
2563                .unwrap_err()
2564                .to_string(),
2565            "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2566        );
2567        assert_eq!(
2568            parse(nested_mixed_json, StructMode::ListOnly)
2569                .unwrap_err()
2570                .to_string(),
2571            "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2572        );
2573    }
2574
2575    // Test cases:
2576    // [] -> RecordBatch row with no entries.  Schema = [('a', Int32)] -> Error
2577    // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
2578    // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
2579    // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
2580    #[test]
2581    fn test_struct_decoding_empty_list() {
2582        let int_field = Field::new("a", DataType::Int32, true);
2583        let struct_field = Field::new(
2584            "r",
2585            DataType::Struct(Fields::from(vec![int_field.clone()])),
2586            true,
2587        );
2588
2589        let parse = |row: &str, as_struct: bool, field: Field| {
2590            _parse_structs(
2591                row,
2592                StructMode::ListOnly,
2593                Fields::from(vec![field]),
2594                as_struct,
2595            )
2596        };
2597
2598        // Missing fields
2599        assert_eq!(
2600            parse("[]", true, struct_field.clone())
2601                .unwrap_err()
2602                .to_string(),
2603            "Json error: found 0 columns for 1 fields".to_owned()
2604        );
2605        assert_eq!(
2606            parse("[]", false, int_field.clone())
2607                .unwrap_err()
2608                .to_string(),
2609            "Json error: found 0 columns for 1 fields".to_owned()
2610        );
2611        assert_eq!(
2612            parse("[]", false, struct_field.clone())
2613                .unwrap_err()
2614                .to_string(),
2615            "Json error: found 0 columns for 1 fields".to_owned()
2616        );
2617        assert_eq!(
2618            parse("[[]]", false, struct_field.clone())
2619                .unwrap_err()
2620                .to_string(),
2621            "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2622        );
2623    }
2624
2625    #[test]
2626    fn test_decode_list_struct_with_wrong_types() {
2627        let int_field = Field::new("a", DataType::Int32, true);
2628        let struct_field = Field::new(
2629            "r",
2630            DataType::Struct(Fields::from(vec![int_field.clone()])),
2631            true,
2632        );
2633
2634        let parse = |row: &str, as_struct: bool, field: Field| {
2635            _parse_structs(
2636                row,
2637                StructMode::ListOnly,
2638                Fields::from(vec![field]),
2639                as_struct,
2640            )
2641        };
2642
2643        // Wrong values
2644        assert_eq!(
2645            parse(r#"[["a"]]"#, false, struct_field.clone())
2646                .unwrap_err()
2647                .to_string(),
2648            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2649        );
2650        assert_eq!(
2651            parse(r#"[["a"]]"#, true, struct_field.clone())
2652                .unwrap_err()
2653                .to_string(),
2654            "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2655        );
2656        assert_eq!(
2657            parse(r#"["a"]"#, true, int_field.clone())
2658                .unwrap_err()
2659                .to_string(),
2660            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2661        );
2662        assert_eq!(
2663            parse(r#"["a"]"#, false, int_field.clone())
2664                .unwrap_err()
2665                .to_string(),
2666            "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2667        );
2668    }
2669}