1use crate::StructMode;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use chrono::Utc;
141use serde::Serialize;
142
143use arrow_array::timezone::Tz;
144use arrow_array::types::*;
145use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader, StructArray};
146use arrow_data::ArrayData;
147use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
148pub use schema::*;
149
150use crate::reader::boolean_array::BooleanArrayDecoder;
151use crate::reader::decimal_array::DecimalArrayDecoder;
152use crate::reader::list_array::ListArrayDecoder;
153use crate::reader::map_array::MapArrayDecoder;
154use crate::reader::null_array::NullArrayDecoder;
155use crate::reader::primitive_array::PrimitiveArrayDecoder;
156use crate::reader::string_array::StringArrayDecoder;
157use crate::reader::struct_array::StructArrayDecoder;
158use crate::reader::tape::{Tape, TapeDecoder};
159use crate::reader::timestamp_array::TimestampArrayDecoder;
160
161mod boolean_array;
162mod decimal_array;
163mod list_array;
164mod map_array;
165mod null_array;
166mod primitive_array;
167mod schema;
168mod serializer;
169mod string_array;
170mod struct_array;
171mod tape;
172mod timestamp_array;
173
174pub struct ReaderBuilder {
176 batch_size: usize,
177 coerce_primitive: bool,
178 strict_mode: bool,
179 is_field: bool,
180 struct_mode: StructMode,
181
182 schema: SchemaRef,
183}
184
185impl ReaderBuilder {
186 pub fn new(schema: SchemaRef) -> Self {
195 Self {
196 batch_size: 1024,
197 coerce_primitive: false,
198 strict_mode: false,
199 is_field: false,
200 struct_mode: Default::default(),
201 schema,
202 }
203 }
204
205 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
236 Self {
237 batch_size: 1024,
238 coerce_primitive: false,
239 strict_mode: false,
240 is_field: true,
241 struct_mode: Default::default(),
242 schema: Arc::new(Schema::new([field.into()])),
243 }
244 }
245
246 pub fn with_batch_size(self, batch_size: usize) -> Self {
248 Self { batch_size, ..self }
249 }
250
251 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
254 Self {
255 coerce_primitive,
256 ..self
257 }
258 }
259
260 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
266 Self {
267 strict_mode,
268 ..self
269 }
270 }
271
272 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
276 Self {
277 struct_mode,
278 ..self
279 }
280 }
281
282 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
284 Ok(Reader {
285 reader,
286 decoder: self.build_decoder()?,
287 })
288 }
289
290 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
292 let (data_type, nullable) = match self.is_field {
293 false => (DataType::Struct(self.schema.fields.clone()), false),
294 true => {
295 let field = &self.schema.fields[0];
296 (field.data_type().clone(), field.is_nullable())
297 }
298 };
299
300 let decoder = make_decoder(
301 data_type,
302 self.coerce_primitive,
303 self.strict_mode,
304 nullable,
305 self.struct_mode,
306 )?;
307
308 let num_fields = self.schema.flattened_fields().len();
309
310 Ok(Decoder {
311 decoder,
312 is_field: self.is_field,
313 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
314 batch_size: self.batch_size,
315 schema: self.schema,
316 })
317 }
318}
319
320pub struct Reader<R> {
324 reader: R,
325 decoder: Decoder,
326}
327
328impl<R> std::fmt::Debug for Reader<R> {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 f.debug_struct("Reader")
331 .field("decoder", &self.decoder)
332 .finish()
333 }
334}
335
336impl<R: BufRead> Reader<R> {
337 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
339 loop {
340 let buf = self.reader.fill_buf()?;
341 if buf.is_empty() {
342 break;
343 }
344 let read = buf.len();
345
346 let decoded = self.decoder.decode(buf)?;
347 self.reader.consume(decoded);
348 if decoded != read {
349 break;
350 }
351 }
352 self.decoder.flush()
353 }
354}
355
356impl<R: BufRead> Iterator for Reader<R> {
357 type Item = Result<RecordBatch, ArrowError>;
358
359 fn next(&mut self) -> Option<Self::Item> {
360 self.read().transpose()
361 }
362}
363
364impl<R: BufRead> RecordBatchReader for Reader<R> {
365 fn schema(&self) -> SchemaRef {
366 self.decoder.schema.clone()
367 }
368}
369
370pub struct Decoder {
411 tape_decoder: TapeDecoder,
412 decoder: Box<dyn ArrayDecoder>,
413 batch_size: usize,
414 is_field: bool,
415 schema: SchemaRef,
416}
417
418impl std::fmt::Debug for Decoder {
419 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420 f.debug_struct("Decoder")
421 .field("schema", &self.schema)
422 .field("batch_size", &self.batch_size)
423 .finish()
424 }
425}
426
427impl Decoder {
428 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
437 self.tape_decoder.decode(buf)
438 }
439
440 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
615 self.tape_decoder.serialize(rows)
616 }
617
618 pub fn has_partial_record(&self) -> bool {
620 self.tape_decoder.has_partial_row()
621 }
622
623 pub fn len(&self) -> usize {
625 self.tape_decoder.num_buffered_rows()
626 }
627
628 pub fn is_empty(&self) -> bool {
630 self.len() == 0
631 }
632
633 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
640 let tape = self.tape_decoder.finish()?;
641
642 if tape.num_rows() == 0 {
643 return Ok(None);
644 }
645
646 let mut next_object = 1;
648 let pos: Vec<_> = (0..tape.num_rows())
649 .map(|_| {
650 let next = tape.next(next_object, "row").unwrap();
651 std::mem::replace(&mut next_object, next)
652 })
653 .collect();
654
655 let decoded = self.decoder.decode(&tape, &pos)?;
656 self.tape_decoder.clear();
657
658 let batch = match self.is_field {
659 true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
660 false => {
661 RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
662 }
663 };
664
665 Ok(Some(batch))
666 }
667}
668
669trait ArrayDecoder: Send {
670 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
672}
673
674macro_rules! primitive_decoder {
675 ($t:ty, $data_type:expr) => {
676 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
677 };
678}
679
680fn make_decoder(
681 data_type: DataType,
682 coerce_primitive: bool,
683 strict_mode: bool,
684 is_nullable: bool,
685 struct_mode: StructMode,
686) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
687 downcast_integer! {
688 data_type => (primitive_decoder, data_type),
689 DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
690 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
691 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
692 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
693 DataType::Timestamp(TimeUnit::Second, None) => {
694 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
695 },
696 DataType::Timestamp(TimeUnit::Millisecond, None) => {
697 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
698 },
699 DataType::Timestamp(TimeUnit::Microsecond, None) => {
700 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
701 },
702 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
703 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
704 },
705 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
706 let tz: Tz = tz.parse()?;
707 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
708 },
709 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
710 let tz: Tz = tz.parse()?;
711 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
712 },
713 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
714 let tz: Tz = tz.parse()?;
715 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
716 },
717 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
718 let tz: Tz = tz.parse()?;
719 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
720 },
721 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
722 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
723 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
724 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
725 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
726 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
727 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
728 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
729 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
730 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
731 DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
732 DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
733 DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
734 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
735 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
736 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
737 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
738 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
739 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
740 Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
741 }
742 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
743 d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use serde_json::json;
750 use std::fs::File;
751 use std::io::{BufReader, Cursor, Seek};
752
753 use arrow_array::cast::AsArray;
754 use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray};
755 use arrow_buffer::{ArrowNativeType, Buffer};
756 use arrow_cast::display::{ArrayFormatter, FormatOptions};
757 use arrow_data::ArrayDataBuilder;
758 use arrow_schema::{Field, Fields};
759
760 use super::*;
761
762 fn do_read(
763 buf: &str,
764 batch_size: usize,
765 coerce_primitive: bool,
766 strict_mode: bool,
767 schema: SchemaRef,
768 ) -> Vec<RecordBatch> {
769 let mut unbuffered = vec![];
770
771 for batch_size in [1, 3, 100, batch_size] {
773 unbuffered = ReaderBuilder::new(schema.clone())
774 .with_batch_size(batch_size)
775 .with_coerce_primitive(coerce_primitive)
776 .build(Cursor::new(buf.as_bytes()))
777 .unwrap()
778 .collect::<Result<Vec<_>, _>>()
779 .unwrap();
780
781 for b in unbuffered.iter().take(unbuffered.len() - 1) {
782 assert_eq!(b.num_rows(), batch_size)
783 }
784
785 for b in [1, 3, 5] {
787 let buffered = ReaderBuilder::new(schema.clone())
788 .with_batch_size(batch_size)
789 .with_coerce_primitive(coerce_primitive)
790 .with_strict_mode(strict_mode)
791 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
792 .unwrap()
793 .collect::<Result<Vec<_>, _>>()
794 .unwrap();
795 assert_eq!(unbuffered, buffered);
796 }
797 }
798
799 unbuffered
800 }
801
802 #[test]
803 fn test_basic() {
804 let buf = r#"
805 {"a": 1, "b": 2, "c": true, "d": 1}
806 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
807
808 {"b": 6, "a": 2.0, "d": 45}
809 {"b": "5", "a": 2}
810 {"b": 4e0}
811 {"b": 7, "a": null}
812 "#;
813
814 let schema = Arc::new(Schema::new(vec![
815 Field::new("a", DataType::Int64, true),
816 Field::new("b", DataType::Int32, true),
817 Field::new("c", DataType::Boolean, true),
818 Field::new("d", DataType::Date32, true),
819 Field::new("e", DataType::Date64, true),
820 ]));
821
822 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
823 assert!(decoder.is_empty());
824 assert_eq!(decoder.len(), 0);
825 assert!(!decoder.has_partial_record());
826 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
827 assert!(!decoder.is_empty());
828 assert_eq!(decoder.len(), 6);
829 assert!(!decoder.has_partial_record());
830 let batch = decoder.flush().unwrap().unwrap();
831 assert_eq!(batch.num_rows(), 6);
832 assert!(decoder.is_empty());
833 assert_eq!(decoder.len(), 0);
834 assert!(!decoder.has_partial_record());
835
836 let batches = do_read(buf, 1024, false, false, schema);
837 assert_eq!(batches.len(), 1);
838
839 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
840 assert_eq!(col1.null_count(), 2);
841 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
842 assert!(col1.is_null(4));
843 assert!(col1.is_null(5));
844
845 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
846 assert_eq!(col2.null_count(), 0);
847 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
848
849 let col3 = batches[0].column(2).as_boolean();
850 assert_eq!(col3.null_count(), 4);
851 assert!(col3.value(0));
852 assert!(!col3.is_null(0));
853 assert!(!col3.value(1));
854 assert!(!col3.is_null(1));
855
856 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
857 assert_eq!(col4.null_count(), 3);
858 assert!(col4.is_null(3));
859 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
860
861 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
862 assert_eq!(col5.null_count(), 5);
863 assert!(col5.is_null(0));
864 assert!(col5.is_null(2));
865 assert!(col5.is_null(3));
866 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
867 }
868
869 #[test]
870 fn test_string() {
871 let buf = r#"
872 {"a": "1", "b": "2"}
873 {"a": "hello", "b": "shoo"}
874 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
875
876 {"b": null}
877 {"b": "", "a": null}
878
879 "#;
880 let schema = Arc::new(Schema::new(vec![
881 Field::new("a", DataType::Utf8, true),
882 Field::new("b", DataType::LargeUtf8, true),
883 ]));
884
885 let batches = do_read(buf, 1024, false, false, schema);
886 assert_eq!(batches.len(), 1);
887
888 let col1 = batches[0].column(0).as_string::<i32>();
889 assert_eq!(col1.null_count(), 2);
890 assert_eq!(col1.value(0), "1");
891 assert_eq!(col1.value(1), "hello");
892 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
893 assert!(col1.is_null(3));
894 assert!(col1.is_null(4));
895
896 let col2 = batches[0].column(1).as_string::<i64>();
897 assert_eq!(col2.null_count(), 1);
898 assert_eq!(col2.value(0), "2");
899 assert_eq!(col2.value(1), "shoo");
900 assert_eq!(col2.value(2), "\t😁foo");
901 assert!(col2.is_null(3));
902 assert_eq!(col2.value(4), "");
903 }
904
905 #[test]
906 fn test_complex() {
907 let buf = r#"
908 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
909 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
910 {"list": null, "nested": {"a": null}}
911 "#;
912
913 let schema = Arc::new(Schema::new(vec![
914 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
915 Field::new_struct(
916 "nested",
917 vec![
918 Field::new("a", DataType::Int32, true),
919 Field::new("b", DataType::Int32, true),
920 ],
921 true,
922 ),
923 Field::new_struct(
924 "nested_list",
925 vec![Field::new_list(
926 "list2",
927 Field::new_struct(
928 "element",
929 vec![Field::new("c", DataType::Int32, false)],
930 false,
931 ),
932 true,
933 )],
934 true,
935 ),
936 ]));
937
938 let batches = do_read(buf, 1024, false, false, schema);
939 assert_eq!(batches.len(), 1);
940
941 let list = batches[0].column(0).as_list::<i32>();
942 assert_eq!(list.len(), 3);
943 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
944 assert_eq!(list.null_count(), 1);
945 assert!(list.is_null(2));
946 let list_values = list.values().as_primitive::<Int32Type>();
947 assert_eq!(list_values.values(), &[5, 6]);
948
949 let nested = batches[0].column(1).as_struct();
950 let a = nested.column(0).as_primitive::<Int32Type>();
951 assert_eq!(list.null_count(), 1);
952 assert_eq!(a.values(), &[1, 7, 0]);
953 assert!(list.is_null(2));
954
955 let b = nested.column(1).as_primitive::<Int32Type>();
956 assert_eq!(b.null_count(), 2);
957 assert_eq!(b.len(), 3);
958 assert_eq!(b.value(0), 2);
959 assert!(b.is_null(1));
960 assert!(b.is_null(2));
961
962 let nested_list = batches[0].column(2).as_struct();
963 assert_eq!(nested_list.len(), 3);
964 assert_eq!(nested_list.null_count(), 1);
965 assert!(nested_list.is_null(2));
966
967 let list2 = nested_list.column(0).as_list::<i32>();
968 assert_eq!(list2.len(), 3);
969 assert_eq!(list2.null_count(), 1);
970 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
971 assert!(list2.is_null(2));
972
973 let list2_values = list2.values().as_struct();
974
975 let c = list2_values.column(0).as_primitive::<Int32Type>();
976 assert_eq!(c.values(), &[3, 4]);
977 }
978
979 #[test]
980 fn test_projection() {
981 let buf = r#"
982 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
983 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
984 "#;
985
986 let schema = Arc::new(Schema::new(vec![
987 Field::new_struct(
988 "nested",
989 vec![Field::new("a", DataType::Int32, false)],
990 true,
991 ),
992 Field::new_struct(
993 "nested_list",
994 vec![Field::new_list(
995 "list2",
996 Field::new_struct(
997 "element",
998 vec![Field::new("d", DataType::Int32, true)],
999 false,
1000 ),
1001 true,
1002 )],
1003 true,
1004 ),
1005 ]));
1006
1007 let batches = do_read(buf, 1024, false, false, schema);
1008 assert_eq!(batches.len(), 1);
1009
1010 let nested = batches[0].column(0).as_struct();
1011 assert_eq!(nested.num_columns(), 1);
1012 let a = nested.column(0).as_primitive::<Int32Type>();
1013 assert_eq!(a.null_count(), 0);
1014 assert_eq!(a.values(), &[1, 7]);
1015
1016 let nested_list = batches[0].column(1).as_struct();
1017 assert_eq!(nested_list.num_columns(), 1);
1018 assert_eq!(nested_list.null_count(), 0);
1019
1020 let list2 = nested_list.column(0).as_list::<i32>();
1021 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1022 assert_eq!(list2.null_count(), 0);
1023
1024 let child = list2.values().as_struct();
1025 assert_eq!(child.num_columns(), 1);
1026 assert_eq!(child.len(), 2);
1027 assert_eq!(child.null_count(), 0);
1028
1029 let c = child.column(0).as_primitive::<Int32Type>();
1030 assert_eq!(c.values(), &[5, 0]);
1031 assert_eq!(c.null_count(), 1);
1032 assert!(c.is_null(1));
1033 }
1034
1035 #[test]
1036 fn test_map() {
1037 let buf = r#"
1038 {"map": {"a": ["foo", null]}}
1039 {"map": {"a": [null], "b": []}}
1040 {"map": {"c": null, "a": ["baz"]}}
1041 "#;
1042 let map = Field::new_map(
1043 "map",
1044 "entries",
1045 Field::new("key", DataType::Utf8, false),
1046 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1047 false,
1048 true,
1049 );
1050
1051 let schema = Arc::new(Schema::new(vec![map]));
1052
1053 let batches = do_read(buf, 1024, false, false, schema);
1054 assert_eq!(batches.len(), 1);
1055
1056 let map = batches[0].column(0).as_map();
1057 let map_keys = map.keys().as_string::<i32>();
1058 let map_values = map.values().as_list::<i32>();
1059 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1060
1061 let k: Vec<_> = map_keys.iter().flatten().collect();
1062 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1063
1064 let list_values = map_values.values().as_string::<i32>();
1065 let lv: Vec<_> = list_values.iter().collect();
1066 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1067 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1068 assert_eq!(map_values.null_count(), 1);
1069 assert!(map_values.is_null(3));
1070
1071 let options = FormatOptions::default().with_null("null");
1072 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1073 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1074 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1075 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1076 }
1077
1078 #[test]
1079 fn test_not_coercing_primitive_into_string_without_flag() {
1080 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1081
1082 let buf = r#"{"a": 1}"#;
1083 let err = ReaderBuilder::new(schema.clone())
1084 .with_batch_size(1024)
1085 .build(Cursor::new(buf.as_bytes()))
1086 .unwrap()
1087 .read()
1088 .unwrap_err();
1089
1090 assert_eq!(
1091 err.to_string(),
1092 "Json error: whilst decoding field 'a': expected string got 1"
1093 );
1094
1095 let buf = r#"{"a": true}"#;
1096 let err = ReaderBuilder::new(schema)
1097 .with_batch_size(1024)
1098 .build(Cursor::new(buf.as_bytes()))
1099 .unwrap()
1100 .read()
1101 .unwrap_err();
1102
1103 assert_eq!(
1104 err.to_string(),
1105 "Json error: whilst decoding field 'a': expected string got true"
1106 );
1107 }
1108
1109 #[test]
1110 fn test_coercing_primitive_into_string() {
1111 let buf = r#"
1112 {"a": 1, "b": 2, "c": true}
1113 {"a": 2E0, "b": 4, "c": false}
1114
1115 {"b": 6, "a": 2.0}
1116 {"b": "5", "a": 2}
1117 {"b": 4e0}
1118 {"b": 7, "a": null}
1119 "#;
1120
1121 let schema = Arc::new(Schema::new(vec![
1122 Field::new("a", DataType::Utf8, true),
1123 Field::new("b", DataType::Utf8, true),
1124 Field::new("c", DataType::Utf8, true),
1125 ]));
1126
1127 let batches = do_read(buf, 1024, true, false, schema);
1128 assert_eq!(batches.len(), 1);
1129
1130 let col1 = batches[0].column(0).as_string::<i32>();
1131 assert_eq!(col1.null_count(), 2);
1132 assert_eq!(col1.value(0), "1");
1133 assert_eq!(col1.value(1), "2E0");
1134 assert_eq!(col1.value(2), "2.0");
1135 assert_eq!(col1.value(3), "2");
1136 assert!(col1.is_null(4));
1137 assert!(col1.is_null(5));
1138
1139 let col2 = batches[0].column(1).as_string::<i32>();
1140 assert_eq!(col2.null_count(), 0);
1141 assert_eq!(col2.value(0), "2");
1142 assert_eq!(col2.value(1), "4");
1143 assert_eq!(col2.value(2), "6");
1144 assert_eq!(col2.value(3), "5");
1145 assert_eq!(col2.value(4), "4e0");
1146 assert_eq!(col2.value(5), "7");
1147
1148 let col3 = batches[0].column(2).as_string::<i32>();
1149 assert_eq!(col3.null_count(), 4);
1150 assert_eq!(col3.value(0), "true");
1151 assert_eq!(col3.value(1), "false");
1152 assert!(col3.is_null(2));
1153 assert!(col3.is_null(3));
1154 assert!(col3.is_null(4));
1155 assert!(col3.is_null(5));
1156 }
1157
1158 fn test_decimal<T: DecimalType>(data_type: DataType) {
1159 let buf = r#"
1160 {"a": 1, "b": 2, "c": 38.30}
1161 {"a": 2, "b": 4, "c": 123.456}
1162
1163 {"b": 1337, "a": "2.0452"}
1164 {"b": "5", "a": "11034.2"}
1165 {"b": 40}
1166 {"b": 1234, "a": null}
1167 "#;
1168
1169 let schema = Arc::new(Schema::new(vec![
1170 Field::new("a", data_type.clone(), true),
1171 Field::new("b", data_type.clone(), true),
1172 Field::new("c", data_type, true),
1173 ]));
1174
1175 let batches = do_read(buf, 1024, true, false, schema);
1176 assert_eq!(batches.len(), 1);
1177
1178 let col1 = batches[0].column(0).as_primitive::<T>();
1179 assert_eq!(col1.null_count(), 2);
1180 assert!(col1.is_null(4));
1181 assert!(col1.is_null(5));
1182 assert_eq!(
1183 col1.values(),
1184 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1185 );
1186
1187 let col2 = batches[0].column(1).as_primitive::<T>();
1188 assert_eq!(col2.null_count(), 0);
1189 assert_eq!(
1190 col2.values(),
1191 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1192 );
1193
1194 let col3 = batches[0].column(2).as_primitive::<T>();
1195 assert_eq!(col3.null_count(), 4);
1196 assert!(!col3.is_null(0));
1197 assert!(!col3.is_null(1));
1198 assert!(col3.is_null(2));
1199 assert!(col3.is_null(3));
1200 assert!(col3.is_null(4));
1201 assert!(col3.is_null(5));
1202 assert_eq!(
1203 col3.values(),
1204 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1205 );
1206 }
1207
1208 #[test]
1209 fn test_decimals() {
1210 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1211 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1212 }
1213
1214 fn test_timestamp<T: ArrowTimestampType>() {
1215 let buf = r#"
1216 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1217 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1218
1219 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1220 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1221 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1222 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1223 "#;
1224
1225 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1226 let schema = Arc::new(Schema::new(vec![
1227 Field::new("a", T::DATA_TYPE, true),
1228 Field::new("b", T::DATA_TYPE, true),
1229 Field::new("c", T::DATA_TYPE, true),
1230 Field::new("d", with_timezone, true),
1231 ]));
1232
1233 let batches = do_read(buf, 1024, true, false, schema);
1234 assert_eq!(batches.len(), 1);
1235
1236 let unit_in_nanos: i64 = match T::UNIT {
1237 TimeUnit::Second => 1_000_000_000,
1238 TimeUnit::Millisecond => 1_000_000,
1239 TimeUnit::Microsecond => 1_000,
1240 TimeUnit::Nanosecond => 1,
1241 };
1242
1243 let col1 = batches[0].column(0).as_primitive::<T>();
1244 assert_eq!(col1.null_count(), 4);
1245 assert!(col1.is_null(2));
1246 assert!(col1.is_null(3));
1247 assert!(col1.is_null(4));
1248 assert!(col1.is_null(5));
1249 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1250
1251 let col2 = batches[0].column(1).as_primitive::<T>();
1252 assert_eq!(col2.null_count(), 1);
1253 assert!(col2.is_null(5));
1254 assert_eq!(
1255 col2.values(),
1256 &[
1257 1599572549190855000 / unit_in_nanos,
1258 1599572549190855000 / unit_in_nanos,
1259 1599572549000000000 / unit_in_nanos,
1260 40,
1261 1234,
1262 0
1263 ]
1264 );
1265
1266 let col3 = batches[0].column(2).as_primitive::<T>();
1267 assert_eq!(col3.null_count(), 0);
1268 assert_eq!(
1269 col3.values(),
1270 &[
1271 38,
1272 123,
1273 854702816123000000 / unit_in_nanos,
1274 1599572549190855000 / unit_in_nanos,
1275 854702816123000000 / unit_in_nanos,
1276 854738816123000000 / unit_in_nanos
1277 ]
1278 );
1279
1280 let col4 = batches[0].column(3).as_primitive::<T>();
1281
1282 assert_eq!(col4.null_count(), 0);
1283 assert_eq!(
1284 col4.values(),
1285 &[
1286 854674016123000000 / unit_in_nanos,
1287 123,
1288 854702816123000000 / unit_in_nanos,
1289 854720816123000000 / unit_in_nanos,
1290 854674016000000000 / unit_in_nanos,
1291 854640000000000000 / unit_in_nanos
1292 ]
1293 );
1294 }
1295
1296 #[test]
1297 fn test_timestamps() {
1298 test_timestamp::<TimestampSecondType>();
1299 test_timestamp::<TimestampMillisecondType>();
1300 test_timestamp::<TimestampMicrosecondType>();
1301 test_timestamp::<TimestampNanosecondType>();
1302 }
1303
1304 fn test_time<T: ArrowTemporalType>() {
1305 let buf = r#"
1306 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1307 {"a": 2, "b": "23:59:59", "c": 123.456}
1308
1309 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1310 {"b": 40, "c": "13:42:29.190855"}
1311 {"b": 1234, "a": null, "c": "09:26:56.123"}
1312 {"c": "14:26:56.123"}
1313 "#;
1314
1315 let unit = match T::DATA_TYPE {
1316 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1317 _ => unreachable!(),
1318 };
1319
1320 let unit_in_nanos = match unit {
1321 TimeUnit::Second => 1_000_000_000,
1322 TimeUnit::Millisecond => 1_000_000,
1323 TimeUnit::Microsecond => 1_000,
1324 TimeUnit::Nanosecond => 1,
1325 };
1326
1327 let schema = Arc::new(Schema::new(vec![
1328 Field::new("a", T::DATA_TYPE, true),
1329 Field::new("b", T::DATA_TYPE, true),
1330 Field::new("c", T::DATA_TYPE, true),
1331 ]));
1332
1333 let batches = do_read(buf, 1024, true, false, schema);
1334 assert_eq!(batches.len(), 1);
1335
1336 let col1 = batches[0].column(0).as_primitive::<T>();
1337 assert_eq!(col1.null_count(), 4);
1338 assert!(col1.is_null(2));
1339 assert!(col1.is_null(3));
1340 assert!(col1.is_null(4));
1341 assert!(col1.is_null(5));
1342 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1343
1344 let col2 = batches[0].column(1).as_primitive::<T>();
1345 assert_eq!(col2.null_count(), 1);
1346 assert!(col2.is_null(5));
1347 assert_eq!(
1348 col2.values(),
1349 &[
1350 34016123000000 / unit_in_nanos,
1351 86399000000000 / unit_in_nanos,
1352 64800000000000 / unit_in_nanos,
1353 40,
1354 1234,
1355 0
1356 ]
1357 .map(T::Native::usize_as)
1358 );
1359
1360 let col3 = batches[0].column(2).as_primitive::<T>();
1361 assert_eq!(col3.null_count(), 0);
1362 assert_eq!(
1363 col3.values(),
1364 &[
1365 38,
1366 123,
1367 34016123000000 / unit_in_nanos,
1368 49349190855000 / unit_in_nanos,
1369 34016123000000 / unit_in_nanos,
1370 52016123000000 / unit_in_nanos
1371 ]
1372 .map(T::Native::usize_as)
1373 );
1374 }
1375
1376 #[test]
1377 fn test_times() {
1378 test_time::<Time32MillisecondType>();
1379 test_time::<Time32SecondType>();
1380 test_time::<Time64MicrosecondType>();
1381 test_time::<Time64NanosecondType>();
1382 }
1383
1384 fn test_duration<T: ArrowTemporalType>() {
1385 let buf = r#"
1386 {"a": 1, "b": "2"}
1387 {"a": 3, "b": null}
1388 "#;
1389
1390 let schema = Arc::new(Schema::new(vec![
1391 Field::new("a", T::DATA_TYPE, true),
1392 Field::new("b", T::DATA_TYPE, true),
1393 ]));
1394
1395 let batches = do_read(buf, 1024, true, false, schema);
1396 assert_eq!(batches.len(), 1);
1397
1398 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1399 assert_eq!(col_a.null_count(), 0);
1400 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1401
1402 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1403 assert_eq!(col2.null_count(), 1);
1404 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1405 }
1406
1407 #[test]
1408 fn test_durations() {
1409 test_duration::<DurationNanosecondType>();
1410 test_duration::<DurationMicrosecondType>();
1411 test_duration::<DurationMillisecondType>();
1412 test_duration::<DurationSecondType>();
1413 }
1414
1415 #[test]
1416 fn test_delta_checkpoint() {
1417 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1418 let schema = Arc::new(Schema::new(vec![
1419 Field::new_struct(
1420 "protocol",
1421 vec![
1422 Field::new("minReaderVersion", DataType::Int32, true),
1423 Field::new("minWriterVersion", DataType::Int32, true),
1424 ],
1425 true,
1426 ),
1427 Field::new_struct(
1428 "add",
1429 vec![Field::new_map(
1430 "partitionValues",
1431 "key_value",
1432 Field::new("key", DataType::Utf8, false),
1433 Field::new("value", DataType::Utf8, true),
1434 false,
1435 false,
1436 )],
1437 true,
1438 ),
1439 ]));
1440
1441 let batches = do_read(json, 1024, true, false, schema);
1442 assert_eq!(batches.len(), 1);
1443
1444 let s: StructArray = batches.into_iter().next().unwrap().into();
1445 let opts = FormatOptions::default().with_null("null");
1446 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1447 assert_eq!(
1448 formatter.value(0).to_string(),
1449 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1450 );
1451 }
1452
1453 #[test]
1454 fn struct_nullability() {
1455 let do_test = |child: DataType| {
1456 let non_null = r#"{"foo": {}}"#;
1458 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1459 "foo",
1460 vec![Field::new("bar", child, false)],
1461 true,
1462 )]));
1463 let mut reader = ReaderBuilder::new(schema.clone())
1464 .build(Cursor::new(non_null.as_bytes()))
1465 .unwrap();
1466 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1469 let mut reader = ReaderBuilder::new(schema.clone())
1470 .build(Cursor::new(null.as_bytes()))
1471 .unwrap();
1472 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1476 let mut reader = ReaderBuilder::new(schema)
1477 .build(Cursor::new(null.as_bytes()))
1478 .unwrap();
1479 let batch = reader.next().unwrap().unwrap();
1480 assert_eq!(batch.num_columns(), 1);
1481 let foo = batch.column(0).as_struct();
1482 assert_eq!(foo.len(), 1);
1483 assert!(foo.is_null(0));
1484 assert_eq!(foo.num_columns(), 1);
1485
1486 let bar = foo.column(0);
1487 assert_eq!(bar.len(), 1);
1488 assert!(bar.is_null(0));
1490 };
1491
1492 do_test(DataType::Boolean);
1493 do_test(DataType::Int32);
1494 do_test(DataType::Utf8);
1495 do_test(DataType::Decimal128(2, 1));
1496 do_test(DataType::Timestamp(
1497 TimeUnit::Microsecond,
1498 Some("+00:00".into()),
1499 ));
1500 }
1501
1502 #[test]
1503 fn test_truncation() {
1504 let buf = r#"
1505 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1506 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1507 {"i64": -9223372036854775808, "u64": 0 }
1508 {"i64": "-9223372036854775808", "u64": 0 }
1509 "#;
1510
1511 let schema = Arc::new(Schema::new(vec![
1512 Field::new("i64", DataType::Int64, true),
1513 Field::new("u64", DataType::UInt64, true),
1514 ]));
1515
1516 let batches = do_read(buf, 1024, true, false, schema);
1517 assert_eq!(batches.len(), 1);
1518
1519 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1520 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1521
1522 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1523 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1524 }
1525
1526 #[test]
1527 fn test_timestamp_truncation() {
1528 let buf = r#"
1529 {"time": 9223372036854775807 }
1530 {"time": -9223372036854775808 }
1531 {"time": 9e5 }
1532 "#;
1533
1534 let schema = Arc::new(Schema::new(vec![Field::new(
1535 "time",
1536 DataType::Timestamp(TimeUnit::Nanosecond, None),
1537 true,
1538 )]));
1539
1540 let batches = do_read(buf, 1024, true, false, schema);
1541 assert_eq!(batches.len(), 1);
1542
1543 let i64 = batches[0]
1544 .column(0)
1545 .as_primitive::<TimestampNanosecondType>();
1546 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1547 }
1548
1549 #[test]
1550 fn test_strict_mode_no_missing_columns_in_schema() {
1551 let buf = r#"
1552 {"a": 1, "b": "2", "c": true}
1553 {"a": 2E0, "b": "4", "c": false}
1554 "#;
1555
1556 let schema = Arc::new(Schema::new(vec![
1557 Field::new("a", DataType::Int16, false),
1558 Field::new("b", DataType::Utf8, false),
1559 Field::new("c", DataType::Boolean, false),
1560 ]));
1561
1562 let batches = do_read(buf, 1024, true, true, schema);
1563 assert_eq!(batches.len(), 1);
1564
1565 let buf = r#"
1566 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1567 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1568 "#;
1569
1570 let schema = Arc::new(Schema::new(vec![
1571 Field::new("a", DataType::Int16, false),
1572 Field::new("b", DataType::Utf8, false),
1573 Field::new_struct(
1574 "c",
1575 vec![
1576 Field::new("a", DataType::Boolean, false),
1577 Field::new("b", DataType::Int16, false),
1578 ],
1579 false,
1580 ),
1581 ]));
1582
1583 let batches = do_read(buf, 1024, true, true, schema);
1584 assert_eq!(batches.len(), 1);
1585 }
1586
1587 #[test]
1588 fn test_strict_mode_missing_columns_in_schema() {
1589 let buf = r#"
1590 {"a": 1, "b": "2", "c": true}
1591 {"a": 2E0, "b": "4", "c": false}
1592 "#;
1593
1594 let schema = Arc::new(Schema::new(vec![
1595 Field::new("a", DataType::Int16, true),
1596 Field::new("c", DataType::Boolean, true),
1597 ]));
1598
1599 let err = ReaderBuilder::new(schema)
1600 .with_batch_size(1024)
1601 .with_strict_mode(true)
1602 .build(Cursor::new(buf.as_bytes()))
1603 .unwrap()
1604 .read()
1605 .unwrap_err();
1606
1607 assert_eq!(
1608 err.to_string(),
1609 "Json error: column 'b' missing from schema"
1610 );
1611
1612 let buf = r#"
1613 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1614 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1615 "#;
1616
1617 let schema = Arc::new(Schema::new(vec![
1618 Field::new("a", DataType::Int16, false),
1619 Field::new("b", DataType::Utf8, false),
1620 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1621 ]));
1622
1623 let err = ReaderBuilder::new(schema)
1624 .with_batch_size(1024)
1625 .with_strict_mode(true)
1626 .build(Cursor::new(buf.as_bytes()))
1627 .unwrap()
1628 .read()
1629 .unwrap_err();
1630
1631 assert_eq!(
1632 err.to_string(),
1633 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1634 );
1635 }
1636
1637 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1638 let file = File::open(path).unwrap();
1639 let mut reader = BufReader::new(file);
1640 let schema = schema.unwrap_or_else(|| {
1641 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1642 reader.rewind().unwrap();
1643 schema
1644 });
1645 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1646 builder.build(reader).unwrap()
1647 }
1648
1649 #[test]
1650 fn test_json_basic() {
1651 let mut reader = read_file("test/data/basic.json", None);
1652 let batch = reader.next().unwrap().unwrap();
1653
1654 assert_eq!(8, batch.num_columns());
1655 assert_eq!(12, batch.num_rows());
1656
1657 let schema = reader.schema();
1658 let batch_schema = batch.schema();
1659 assert_eq!(schema, batch_schema);
1660
1661 let a = schema.column_with_name("a").unwrap();
1662 assert_eq!(0, a.0);
1663 assert_eq!(&DataType::Int64, a.1.data_type());
1664 let b = schema.column_with_name("b").unwrap();
1665 assert_eq!(1, b.0);
1666 assert_eq!(&DataType::Float64, b.1.data_type());
1667 let c = schema.column_with_name("c").unwrap();
1668 assert_eq!(2, c.0);
1669 assert_eq!(&DataType::Boolean, c.1.data_type());
1670 let d = schema.column_with_name("d").unwrap();
1671 assert_eq!(3, d.0);
1672 assert_eq!(&DataType::Utf8, d.1.data_type());
1673
1674 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1675 assert_eq!(1, aa.value(0));
1676 assert_eq!(-10, aa.value(1));
1677 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1678 assert_eq!(2.0, bb.value(0));
1679 assert_eq!(-3.5, bb.value(1));
1680 let cc = batch.column(c.0).as_boolean();
1681 assert!(!cc.value(0));
1682 assert!(cc.value(10));
1683 let dd = batch.column(d.0).as_string::<i32>();
1684 assert_eq!("4", dd.value(0));
1685 assert_eq!("text", dd.value(8));
1686 }
1687
1688 #[test]
1689 fn test_json_empty_projection() {
1690 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1691 let batch = reader.next().unwrap().unwrap();
1692
1693 assert_eq!(0, batch.num_columns());
1694 assert_eq!(12, batch.num_rows());
1695 }
1696
1697 #[test]
1698 fn test_json_basic_with_nulls() {
1699 let mut reader = read_file("test/data/basic_nulls.json", None);
1700 let batch = reader.next().unwrap().unwrap();
1701
1702 assert_eq!(4, batch.num_columns());
1703 assert_eq!(12, batch.num_rows());
1704
1705 let schema = reader.schema();
1706 let batch_schema = batch.schema();
1707 assert_eq!(schema, batch_schema);
1708
1709 let a = schema.column_with_name("a").unwrap();
1710 assert_eq!(&DataType::Int64, a.1.data_type());
1711 let b = schema.column_with_name("b").unwrap();
1712 assert_eq!(&DataType::Float64, b.1.data_type());
1713 let c = schema.column_with_name("c").unwrap();
1714 assert_eq!(&DataType::Boolean, c.1.data_type());
1715 let d = schema.column_with_name("d").unwrap();
1716 assert_eq!(&DataType::Utf8, d.1.data_type());
1717
1718 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1719 assert!(aa.is_valid(0));
1720 assert!(!aa.is_valid(1));
1721 assert!(!aa.is_valid(11));
1722 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1723 assert!(bb.is_valid(0));
1724 assert!(!bb.is_valid(2));
1725 assert!(!bb.is_valid(11));
1726 let cc = batch.column(c.0).as_boolean();
1727 assert!(cc.is_valid(0));
1728 assert!(!cc.is_valid(4));
1729 assert!(!cc.is_valid(11));
1730 let dd = batch.column(d.0).as_string::<i32>();
1731 assert!(!dd.is_valid(0));
1732 assert!(dd.is_valid(1));
1733 assert!(!dd.is_valid(4));
1734 assert!(!dd.is_valid(11));
1735 }
1736
1737 #[test]
1738 fn test_json_basic_schema() {
1739 let schema = Schema::new(vec![
1740 Field::new("a", DataType::Int64, true),
1741 Field::new("b", DataType::Float32, false),
1742 Field::new("c", DataType::Boolean, false),
1743 Field::new("d", DataType::Utf8, false),
1744 ]);
1745
1746 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1747 let reader_schema = reader.schema();
1748 assert_eq!(reader_schema.as_ref(), &schema);
1749 let batch = reader.next().unwrap().unwrap();
1750
1751 assert_eq!(4, batch.num_columns());
1752 assert_eq!(12, batch.num_rows());
1753
1754 let schema = batch.schema();
1755
1756 let a = schema.column_with_name("a").unwrap();
1757 assert_eq!(&DataType::Int64, a.1.data_type());
1758 let b = schema.column_with_name("b").unwrap();
1759 assert_eq!(&DataType::Float32, b.1.data_type());
1760 let c = schema.column_with_name("c").unwrap();
1761 assert_eq!(&DataType::Boolean, c.1.data_type());
1762 let d = schema.column_with_name("d").unwrap();
1763 assert_eq!(&DataType::Utf8, d.1.data_type());
1764
1765 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1766 assert_eq!(1, aa.value(0));
1767 assert_eq!(100000000000000, aa.value(11));
1768 let bb = batch.column(b.0).as_primitive::<Float32Type>();
1769 assert_eq!(2.0, bb.value(0));
1770 assert_eq!(-3.5, bb.value(1));
1771 }
1772
1773 #[test]
1774 fn test_json_basic_schema_projection() {
1775 let schema = Schema::new(vec![
1776 Field::new("a", DataType::Int64, true),
1777 Field::new("c", DataType::Boolean, false),
1778 ]);
1779
1780 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1781 let batch = reader.next().unwrap().unwrap();
1782
1783 assert_eq!(2, batch.num_columns());
1784 assert_eq!(2, batch.schema().fields().len());
1785 assert_eq!(12, batch.num_rows());
1786
1787 assert_eq!(batch.schema().as_ref(), &schema);
1788
1789 let a = schema.column_with_name("a").unwrap();
1790 assert_eq!(0, a.0);
1791 assert_eq!(&DataType::Int64, a.1.data_type());
1792 let c = schema.column_with_name("c").unwrap();
1793 assert_eq!(1, c.0);
1794 assert_eq!(&DataType::Boolean, c.1.data_type());
1795 }
1796
1797 #[test]
1798 fn test_json_arrays() {
1799 let mut reader = read_file("test/data/arrays.json", None);
1800 let batch = reader.next().unwrap().unwrap();
1801
1802 assert_eq!(4, batch.num_columns());
1803 assert_eq!(3, batch.num_rows());
1804
1805 let schema = batch.schema();
1806
1807 let a = schema.column_with_name("a").unwrap();
1808 assert_eq!(&DataType::Int64, a.1.data_type());
1809 let b = schema.column_with_name("b").unwrap();
1810 assert_eq!(
1811 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
1812 b.1.data_type()
1813 );
1814 let c = schema.column_with_name("c").unwrap();
1815 assert_eq!(
1816 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
1817 c.1.data_type()
1818 );
1819 let d = schema.column_with_name("d").unwrap();
1820 assert_eq!(&DataType::Utf8, d.1.data_type());
1821
1822 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1823 assert_eq!(1, aa.value(0));
1824 assert_eq!(-10, aa.value(1));
1825 assert_eq!(1627668684594000000, aa.value(2));
1826 let bb = batch.column(b.0).as_list::<i32>();
1827 let bb = bb.values().as_primitive::<Float64Type>();
1828 assert_eq!(9, bb.len());
1829 assert_eq!(2.0, bb.value(0));
1830 assert_eq!(-6.1, bb.value(5));
1831 assert!(!bb.is_valid(7));
1832
1833 let cc = batch
1834 .column(c.0)
1835 .as_any()
1836 .downcast_ref::<ListArray>()
1837 .unwrap();
1838 let cc = cc.values().as_boolean();
1839 assert_eq!(6, cc.len());
1840 assert!(!cc.value(0));
1841 assert!(!cc.value(4));
1842 assert!(!cc.is_valid(5));
1843 }
1844
1845 #[test]
1846 fn test_empty_json_arrays() {
1847 let json_content = r#"
1848 {"items": []}
1849 {"items": null}
1850 {}
1851 "#;
1852
1853 let schema = Arc::new(Schema::new(vec![Field::new(
1854 "items",
1855 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
1856 true,
1857 )]));
1858
1859 let batches = do_read(json_content, 1024, false, false, schema);
1860 assert_eq!(batches.len(), 1);
1861
1862 let col1 = batches[0].column(0).as_list::<i32>();
1863 assert_eq!(col1.null_count(), 2);
1864 assert!(col1.value(0).is_empty());
1865 assert_eq!(col1.value(0).data_type(), &DataType::Null);
1866 assert!(col1.is_null(1));
1867 assert!(col1.is_null(2));
1868 }
1869
1870 #[test]
1871 fn test_nested_empty_json_arrays() {
1872 let json_content = r#"
1873 {"items": [[],[]]}
1874 {"items": [[null, null],[null]]}
1875 "#;
1876
1877 let schema = Arc::new(Schema::new(vec![Field::new(
1878 "items",
1879 DataType::List(FieldRef::new(Field::new_list_field(
1880 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
1881 true,
1882 ))),
1883 true,
1884 )]));
1885
1886 let batches = do_read(json_content, 1024, false, false, schema);
1887 assert_eq!(batches.len(), 1);
1888
1889 let col1 = batches[0].column(0).as_list::<i32>();
1890 assert_eq!(col1.null_count(), 0);
1891 assert_eq!(col1.value(0).len(), 2);
1892 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
1893 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
1894
1895 assert_eq!(col1.value(1).len(), 2);
1896 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
1897 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
1898 }
1899
1900 #[test]
1901 fn test_nested_list_json_arrays() {
1902 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
1903 let a_struct_field = Field::new_struct(
1904 "a",
1905 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
1906 true,
1907 );
1908 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
1909 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
1910 let builder = ReaderBuilder::new(schema).with_batch_size(64);
1911 let json_content = r#"
1912 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
1913 {"a": [{"b": false, "c": null}]}
1914 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
1915 {"a": null}
1916 {"a": []}
1917 {"a": [null]}
1918 "#;
1919 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
1920
1921 let d = StringArray::from(vec![
1923 Some("a_text"),
1924 Some("b_text"),
1925 None,
1926 Some("c_text"),
1927 Some("d_text"),
1928 None,
1929 None,
1930 ]);
1931 let c = ArrayDataBuilder::new(c_field.data_type().clone())
1932 .len(7)
1933 .add_child_data(d.to_data())
1934 .null_bit_buffer(Some(Buffer::from([0b00111011])))
1935 .build()
1936 .unwrap();
1937 let b = BooleanArray::from(vec![
1938 Some(true),
1939 Some(false),
1940 Some(false),
1941 Some(true),
1942 None,
1943 Some(true),
1944 None,
1945 ]);
1946 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
1947 .len(7)
1948 .add_child_data(b.to_data())
1949 .add_child_data(c.clone())
1950 .null_bit_buffer(Some(Buffer::from([0b00111111])))
1951 .build()
1952 .unwrap();
1953 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
1954 .len(6)
1955 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
1956 .add_child_data(a)
1957 .null_bit_buffer(Some(Buffer::from([0b00110111])))
1958 .build()
1959 .unwrap();
1960 let expected = make_array(a_list);
1961
1962 let batch = reader.next().unwrap().unwrap();
1964 let read = batch.column(0);
1965 assert_eq!(read.len(), 6);
1966 let read: &ListArray = read.as_list::<i32>();
1968 let expected = expected.as_list::<i32>();
1969 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
1970 assert_eq!(read.nulls(), expected.nulls());
1972 let struct_array = read.values().as_struct();
1974 let expected_struct_array = expected.values().as_struct();
1975
1976 assert_eq!(7, struct_array.len());
1977 assert_eq!(1, struct_array.null_count());
1978 assert_eq!(7, expected_struct_array.len());
1979 assert_eq!(1, expected_struct_array.null_count());
1980 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
1982 let read_b = struct_array.column(0);
1984 assert_eq!(read_b.as_ref(), &b);
1985 let read_c = struct_array.column(1);
1986 assert_eq!(read_c.to_data(), c);
1987 let read_c = read_c.as_struct();
1988 let read_d = read_c.column(0);
1989 assert_eq!(read_d.as_ref(), &d);
1990
1991 assert_eq!(read, expected);
1992 }
1993
1994 #[test]
1995 fn test_skip_empty_lines() {
1996 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
1997 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1998 let json_content = "
1999 {\"a\": 1}
2000 {\"a\": 2}
2001 {\"a\": 3}";
2002 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2003 let batch = reader.next().unwrap().unwrap();
2004
2005 assert_eq!(1, batch.num_columns());
2006 assert_eq!(3, batch.num_rows());
2007
2008 let schema = reader.schema();
2009 let c = schema.column_with_name("a").unwrap();
2010 assert_eq!(&DataType::Int64, c.1.data_type());
2011 }
2012
2013 #[test]
2014 fn test_with_multiple_batches() {
2015 let file = File::open("test/data/basic_nulls.json").unwrap();
2016 let mut reader = BufReader::new(file);
2017 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2018 reader.rewind().unwrap();
2019
2020 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2021 let mut reader = builder.build(reader).unwrap();
2022
2023 let mut num_records = Vec::new();
2024 while let Some(rb) = reader.next().transpose().unwrap() {
2025 num_records.push(rb.num_rows());
2026 }
2027
2028 assert_eq!(vec![5, 5, 2], num_records);
2029 }
2030
2031 #[test]
2032 fn test_timestamp_from_json_seconds() {
2033 let schema = Schema::new(vec![Field::new(
2034 "a",
2035 DataType::Timestamp(TimeUnit::Second, None),
2036 true,
2037 )]);
2038
2039 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2040 let batch = reader.next().unwrap().unwrap();
2041
2042 assert_eq!(1, batch.num_columns());
2043 assert_eq!(12, batch.num_rows());
2044
2045 let schema = reader.schema();
2046 let batch_schema = batch.schema();
2047 assert_eq!(schema, batch_schema);
2048
2049 let a = schema.column_with_name("a").unwrap();
2050 assert_eq!(
2051 &DataType::Timestamp(TimeUnit::Second, None),
2052 a.1.data_type()
2053 );
2054
2055 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2056 assert!(aa.is_valid(0));
2057 assert!(!aa.is_valid(1));
2058 assert!(!aa.is_valid(2));
2059 assert_eq!(1, aa.value(0));
2060 assert_eq!(1, aa.value(3));
2061 assert_eq!(5, aa.value(7));
2062 }
2063
2064 #[test]
2065 fn test_timestamp_from_json_milliseconds() {
2066 let schema = Schema::new(vec![Field::new(
2067 "a",
2068 DataType::Timestamp(TimeUnit::Millisecond, None),
2069 true,
2070 )]);
2071
2072 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2073 let batch = reader.next().unwrap().unwrap();
2074
2075 assert_eq!(1, batch.num_columns());
2076 assert_eq!(12, batch.num_rows());
2077
2078 let schema = reader.schema();
2079 let batch_schema = batch.schema();
2080 assert_eq!(schema, batch_schema);
2081
2082 let a = schema.column_with_name("a").unwrap();
2083 assert_eq!(
2084 &DataType::Timestamp(TimeUnit::Millisecond, None),
2085 a.1.data_type()
2086 );
2087
2088 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2089 assert!(aa.is_valid(0));
2090 assert!(!aa.is_valid(1));
2091 assert!(!aa.is_valid(2));
2092 assert_eq!(1, aa.value(0));
2093 assert_eq!(1, aa.value(3));
2094 assert_eq!(5, aa.value(7));
2095 }
2096
2097 #[test]
2098 fn test_date_from_json_milliseconds() {
2099 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2100
2101 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2102 let batch = reader.next().unwrap().unwrap();
2103
2104 assert_eq!(1, batch.num_columns());
2105 assert_eq!(12, batch.num_rows());
2106
2107 let schema = reader.schema();
2108 let batch_schema = batch.schema();
2109 assert_eq!(schema, batch_schema);
2110
2111 let a = schema.column_with_name("a").unwrap();
2112 assert_eq!(&DataType::Date64, a.1.data_type());
2113
2114 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2115 assert!(aa.is_valid(0));
2116 assert!(!aa.is_valid(1));
2117 assert!(!aa.is_valid(2));
2118 assert_eq!(1, aa.value(0));
2119 assert_eq!(1, aa.value(3));
2120 assert_eq!(5, aa.value(7));
2121 }
2122
2123 #[test]
2124 fn test_time_from_json_nanoseconds() {
2125 let schema = Schema::new(vec![Field::new(
2126 "a",
2127 DataType::Time64(TimeUnit::Nanosecond),
2128 true,
2129 )]);
2130
2131 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2132 let batch = reader.next().unwrap().unwrap();
2133
2134 assert_eq!(1, batch.num_columns());
2135 assert_eq!(12, batch.num_rows());
2136
2137 let schema = reader.schema();
2138 let batch_schema = batch.schema();
2139 assert_eq!(schema, batch_schema);
2140
2141 let a = schema.column_with_name("a").unwrap();
2142 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2143
2144 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2145 assert!(aa.is_valid(0));
2146 assert!(!aa.is_valid(1));
2147 assert!(!aa.is_valid(2));
2148 assert_eq!(1, aa.value(0));
2149 assert_eq!(1, aa.value(3));
2150 assert_eq!(5, aa.value(7));
2151 }
2152
2153 #[test]
2154 fn test_json_iterator() {
2155 let file = File::open("test/data/basic.json").unwrap();
2156 let mut reader = BufReader::new(file);
2157 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2158 reader.rewind().unwrap();
2159
2160 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2161 let reader = builder.build(reader).unwrap();
2162 let schema = reader.schema();
2163 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2164
2165 let mut sum_num_rows = 0;
2166 let mut num_batches = 0;
2167 let mut sum_a = 0;
2168 for batch in reader {
2169 let batch = batch.unwrap();
2170 assert_eq!(8, batch.num_columns());
2171 sum_num_rows += batch.num_rows();
2172 num_batches += 1;
2173 let batch_schema = batch.schema();
2174 assert_eq!(schema, batch_schema);
2175 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2176 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2177 }
2178 assert_eq!(12, sum_num_rows);
2179 assert_eq!(3, num_batches);
2180 assert_eq!(100000000000011, sum_a);
2181 }
2182
2183 #[test]
2184 fn test_decoder_error() {
2185 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2186 "a",
2187 vec![Field::new("child", DataType::Int32, false)],
2188 true,
2189 )]));
2190
2191 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2192 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2193 assert!(decoder.tape_decoder.has_partial_row());
2194 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2195 let _ = decoder.flush().unwrap_err();
2196 assert!(decoder.tape_decoder.has_partial_row());
2197 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2198
2199 let parse_err = |s: &str| {
2200 ReaderBuilder::new(schema.clone())
2201 .build(Cursor::new(s.as_bytes()))
2202 .unwrap()
2203 .next()
2204 .unwrap()
2205 .unwrap_err()
2206 .to_string()
2207 };
2208
2209 let err = parse_err(r#"{"a": 123}"#);
2210 assert_eq!(
2211 err,
2212 "Json error: whilst decoding field 'a': expected { got 123"
2213 );
2214
2215 let err = parse_err(r#"{"a": ["bar"]}"#);
2216 assert_eq!(
2217 err,
2218 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2219 );
2220
2221 let err = parse_err(r#"{"a": []}"#);
2222 assert_eq!(
2223 err,
2224 "Json error: whilst decoding field 'a': expected { got []"
2225 );
2226
2227 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2228 assert_eq!(
2229 err,
2230 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2231 );
2232
2233 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2234 assert_eq!(
2235 err,
2236 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2237 );
2238
2239 let err = parse_err(r#"{"a": true}"#);
2240 assert_eq!(
2241 err,
2242 "Json error: whilst decoding field 'a': expected { got true"
2243 );
2244
2245 let err = parse_err(r#"{"a": false}"#);
2246 assert_eq!(
2247 err,
2248 "Json error: whilst decoding field 'a': expected { got false"
2249 );
2250
2251 let err = parse_err(r#"{"a": "foo"}"#);
2252 assert_eq!(
2253 err,
2254 "Json error: whilst decoding field 'a': expected { got \"foo\""
2255 );
2256
2257 let err = parse_err(r#"{"a": {"child": false}}"#);
2258 assert_eq!(
2259 err,
2260 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2261 );
2262
2263 let err = parse_err(r#"{"a": {"child": []}}"#);
2264 assert_eq!(
2265 err,
2266 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2267 );
2268
2269 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2270 assert_eq!(
2271 err,
2272 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2273 );
2274
2275 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2276 assert_eq!(
2277 err,
2278 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2279 );
2280 }
2281
2282 #[test]
2283 fn test_serialize_timestamp() {
2284 let json = vec![
2285 json!({"timestamp": 1681319393}),
2286 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2287 ];
2288 let schema = Schema::new(vec![Field::new(
2289 "timestamp",
2290 DataType::Timestamp(TimeUnit::Second, None),
2291 true,
2292 )]);
2293 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2294 .build_decoder()
2295 .unwrap();
2296 decoder.serialize(&json).unwrap();
2297 let batch = decoder.flush().unwrap().unwrap();
2298 assert_eq!(batch.num_rows(), 2);
2299 assert_eq!(batch.num_columns(), 1);
2300 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2301 assert_eq!(values.values(), &[1681319393, -7200]);
2302 }
2303
2304 #[test]
2305 fn test_serialize_decimal() {
2306 let json = vec![
2307 json!({"decimal": 1.234}),
2308 json!({"decimal": "1.234"}),
2309 json!({"decimal": 1234}),
2310 json!({"decimal": "1234"}),
2311 ];
2312 let schema = Schema::new(vec![Field::new(
2313 "decimal",
2314 DataType::Decimal128(10, 3),
2315 true,
2316 )]);
2317 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2318 .build_decoder()
2319 .unwrap();
2320 decoder.serialize(&json).unwrap();
2321 let batch = decoder.flush().unwrap().unwrap();
2322 assert_eq!(batch.num_rows(), 4);
2323 assert_eq!(batch.num_columns(), 1);
2324 let values = batch.column(0).as_primitive::<Decimal128Type>();
2325 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2326 }
2327
2328 #[test]
2329 fn test_serde_field() {
2330 let field = Field::new("int", DataType::Int32, true);
2331 let mut decoder = ReaderBuilder::new_with_field(field)
2332 .build_decoder()
2333 .unwrap();
2334 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2335 let b = decoder.flush().unwrap().unwrap();
2336 let values = b.column(0).as_primitive::<Int32Type>().values();
2337 assert_eq!(values, &[1, 2, 3, 4]);
2338 }
2339
2340 #[test]
2341 fn test_serde_large_numbers() {
2342 let field = Field::new("int", DataType::Int64, true);
2343 let mut decoder = ReaderBuilder::new_with_field(field)
2344 .build_decoder()
2345 .unwrap();
2346
2347 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2348 let b = decoder.flush().unwrap().unwrap();
2349 let values = b.column(0).as_primitive::<Int64Type>().values();
2350 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2351
2352 let field = Field::new(
2353 "int",
2354 DataType::Timestamp(TimeUnit::Microsecond, None),
2355 true,
2356 );
2357 let mut decoder = ReaderBuilder::new_with_field(field)
2358 .build_decoder()
2359 .unwrap();
2360
2361 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2362 let b = decoder.flush().unwrap().unwrap();
2363 let values = b
2364 .column(0)
2365 .as_primitive::<TimestampMicrosecondType>()
2366 .values();
2367 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2368 }
2369
2370 #[test]
2371 fn test_coercing_primitive_into_string_decoder() {
2372 let buf = &format!(
2373 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2374 (i32::MAX as i64 + 10),
2375 i64::MAX - 10
2376 );
2377 let schema = Schema::new(vec![
2378 Field::new("a", DataType::Float64, true),
2379 Field::new("b", DataType::Utf8, true),
2380 Field::new("c", DataType::Utf8, true),
2381 ]);
2382 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2383 let schema_ref = Arc::new(schema);
2384
2385 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2387 let mut decoder = reader.build_decoder().unwrap();
2388 decoder.serialize(json_array.as_slice()).unwrap();
2389 let batch = decoder.flush().unwrap().unwrap();
2390 assert_eq!(
2391 batch,
2392 RecordBatch::try_new(
2393 schema_ref,
2394 vec![
2395 Arc::new(Float64Array::from(vec![
2396 1.0,
2397 2.0,
2398 (i32::MAX as i64 + 10) as f64,
2399 (i64::MAX - 10) as f64
2400 ])),
2401 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2402 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2403 ]
2404 )
2405 .unwrap()
2406 );
2407 }
2408
2409 fn _parse_structs(
2414 row: &str,
2415 struct_mode: StructMode,
2416 fields: Fields,
2417 as_struct: bool,
2418 ) -> Result<RecordBatch, ArrowError> {
2419 let builder = if as_struct {
2420 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2421 } else {
2422 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2423 };
2424 builder
2425 .with_struct_mode(struct_mode)
2426 .build(Cursor::new(row.as_bytes()))
2427 .unwrap()
2428 .next()
2429 .unwrap()
2430 }
2431
2432 #[test]
2433 fn test_struct_decoding_list_length() {
2434 use arrow_array::array;
2435
2436 let row = "[1, 2]";
2437
2438 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2439 let too_few_fields = Fields::from(fields.clone());
2440 fields.push(Field::new("b", DataType::Int32, true));
2441 let correct_fields = Fields::from(fields.clone());
2442 fields.push(Field::new("c", DataType::Int32, true));
2443 let too_many_fields = Fields::from(fields.clone());
2444
2445 let parse = |fields: Fields, as_struct: bool| {
2446 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2447 };
2448
2449 let expected_row = StructArray::new(
2450 correct_fields.clone(),
2451 vec![
2452 Arc::new(array::Int32Array::from(vec![1])),
2453 Arc::new(array::Int32Array::from(vec![2])),
2454 ],
2455 None,
2456 );
2457 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2458
2459 assert_eq!(
2460 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2461 "Json error: found extra columns for 1 fields".to_string()
2462 );
2463 assert_eq!(
2464 parse(too_few_fields, false).unwrap_err().to_string(),
2465 "Json error: found extra columns for 1 fields".to_string()
2466 );
2467 assert_eq!(
2468 parse(correct_fields.clone(), true).unwrap(),
2469 RecordBatch::try_new(
2470 Arc::new(Schema::new(vec![row_field])),
2471 vec![Arc::new(expected_row.clone())]
2472 )
2473 .unwrap()
2474 );
2475 assert_eq!(
2476 parse(correct_fields, false).unwrap(),
2477 RecordBatch::from(expected_row)
2478 );
2479 assert_eq!(
2480 parse(too_many_fields.clone(), true)
2481 .unwrap_err()
2482 .to_string(),
2483 "Json error: found 2 columns for 3 fields".to_string()
2484 );
2485 assert_eq!(
2486 parse(too_many_fields, false).unwrap_err().to_string(),
2487 "Json error: found 2 columns for 3 fields".to_string()
2488 );
2489 }
2490
2491 #[test]
2492 fn test_struct_decoding() {
2493 use arrow_array::builder;
2494
2495 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2496 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2497 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2498
2499 let struct_fields = Fields::from(vec![
2500 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2501 Field::new_map(
2502 "c",
2503 "entries",
2504 Field::new("keys", DataType::Utf8, false),
2505 Field::new("values", DataType::Int32, true),
2506 false,
2507 false,
2508 ),
2509 ]);
2510
2511 let list_array =
2512 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2513
2514 let map_array = {
2515 let mut map_builder = builder::MapBuilder::new(
2516 None,
2517 builder::StringBuilder::new(),
2518 builder::Int32Builder::new(),
2519 );
2520 map_builder.keys().append_value("d");
2521 map_builder.values().append_value(3);
2522 map_builder.append(true).unwrap();
2523 map_builder.finish()
2524 };
2525
2526 let struct_array = StructArray::new(
2527 struct_fields.clone(),
2528 vec![Arc::new(list_array), Arc::new(map_array)],
2529 None,
2530 );
2531
2532 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2533 let schema = Arc::new(Schema::new(fields.clone()));
2534 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2535
2536 let parse = |row: &str, struct_mode: StructMode| {
2537 _parse_structs(row, struct_mode, fields.clone(), false)
2538 };
2539
2540 assert_eq!(
2541 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2542 expected
2543 );
2544 assert_eq!(
2545 parse(nested_list_json, StructMode::ObjectOnly)
2546 .unwrap_err()
2547 .to_string(),
2548 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2549 );
2550 assert_eq!(
2551 parse(nested_mixed_json, StructMode::ObjectOnly)
2552 .unwrap_err()
2553 .to_string(),
2554 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2555 );
2556
2557 assert_eq!(
2558 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2559 expected
2560 );
2561 assert_eq!(
2562 parse(nested_object_json, StructMode::ListOnly)
2563 .unwrap_err()
2564 .to_string(),
2565 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2566 );
2567 assert_eq!(
2568 parse(nested_mixed_json, StructMode::ListOnly)
2569 .unwrap_err()
2570 .to_string(),
2571 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2572 );
2573 }
2574
2575 #[test]
2581 fn test_struct_decoding_empty_list() {
2582 let int_field = Field::new("a", DataType::Int32, true);
2583 let struct_field = Field::new(
2584 "r",
2585 DataType::Struct(Fields::from(vec![int_field.clone()])),
2586 true,
2587 );
2588
2589 let parse = |row: &str, as_struct: bool, field: Field| {
2590 _parse_structs(
2591 row,
2592 StructMode::ListOnly,
2593 Fields::from(vec![field]),
2594 as_struct,
2595 )
2596 };
2597
2598 assert_eq!(
2600 parse("[]", true, struct_field.clone())
2601 .unwrap_err()
2602 .to_string(),
2603 "Json error: found 0 columns for 1 fields".to_owned()
2604 );
2605 assert_eq!(
2606 parse("[]", false, int_field.clone())
2607 .unwrap_err()
2608 .to_string(),
2609 "Json error: found 0 columns for 1 fields".to_owned()
2610 );
2611 assert_eq!(
2612 parse("[]", false, struct_field.clone())
2613 .unwrap_err()
2614 .to_string(),
2615 "Json error: found 0 columns for 1 fields".to_owned()
2616 );
2617 assert_eq!(
2618 parse("[[]]", false, struct_field.clone())
2619 .unwrap_err()
2620 .to_string(),
2621 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2622 );
2623 }
2624
2625 #[test]
2626 fn test_decode_list_struct_with_wrong_types() {
2627 let int_field = Field::new("a", DataType::Int32, true);
2628 let struct_field = Field::new(
2629 "r",
2630 DataType::Struct(Fields::from(vec![int_field.clone()])),
2631 true,
2632 );
2633
2634 let parse = |row: &str, as_struct: bool, field: Field| {
2635 _parse_structs(
2636 row,
2637 StructMode::ListOnly,
2638 Fields::from(vec![field]),
2639 as_struct,
2640 )
2641 };
2642
2643 assert_eq!(
2645 parse(r#"[["a"]]"#, false, struct_field.clone())
2646 .unwrap_err()
2647 .to_string(),
2648 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2649 );
2650 assert_eq!(
2651 parse(r#"[["a"]]"#, true, struct_field.clone())
2652 .unwrap_err()
2653 .to_string(),
2654 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2655 );
2656 assert_eq!(
2657 parse(r#"["a"]"#, true, int_field.clone())
2658 .unwrap_err()
2659 .to_string(),
2660 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2661 );
2662 assert_eq!(
2663 parse(r#"["a"]"#, false, int_field.clone())
2664 .unwrap_err()
2665 .to_string(),
2666 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2667 );
2668 }
2669}