1mod encoder;
108
109use std::{fmt::Debug, io::Write};
110
111use crate::StructMode;
112use arrow_array::*;
113use arrow_schema::*;
114
115use encoder::{make_encoder, EncoderOptions};
116
117pub trait JsonFormat: Debug + Default {
120 #[inline]
121 fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
123 Ok(())
124 }
125
126 #[inline]
127 fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<(), ArrowError> {
129 Ok(())
130 }
131
132 #[inline]
133 fn end_row<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
135 Ok(())
136 }
137
138 fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<(), ArrowError> {
140 Ok(())
141 }
142}
143
144#[derive(Debug, Default)]
154pub struct LineDelimited {}
155
156impl JsonFormat for LineDelimited {
157 fn end_row<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
158 writer.write_all(b"\n")?;
159 Ok(())
160 }
161}
162
163#[derive(Debug, Default)]
171pub struct JsonArray {}
172
173impl JsonFormat for JsonArray {
174 fn start_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
175 writer.write_all(b"[")?;
176 Ok(())
177 }
178
179 fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<(), ArrowError> {
180 if !is_first_row {
181 writer.write_all(b",")?;
182 }
183 Ok(())
184 }
185
186 fn end_stream<W: Write>(&self, writer: &mut W) -> Result<(), ArrowError> {
187 writer.write_all(b"]")?;
188 Ok(())
189 }
190}
191
192pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
194
195pub type ArrayWriter<W> = Writer<W, JsonArray>;
197
198#[derive(Debug, Clone, Default)]
200pub struct WriterBuilder(EncoderOptions);
201
202impl WriterBuilder {
203 pub fn new() -> Self {
223 Self::default()
224 }
225
226 pub fn explicit_nulls(&self) -> bool {
228 self.0.explicit_nulls
229 }
230
231 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
254 self.0.explicit_nulls = explicit_nulls;
255 self
256 }
257
258 pub fn struct_mode(&self) -> StructMode {
260 self.0.struct_mode
261 }
262
263 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
269 self.0.struct_mode = struct_mode;
270 self
271 }
272
273 pub fn build<W, F>(self, writer: W) -> Writer<W, F>
275 where
276 W: Write,
277 F: JsonFormat,
278 {
279 Writer {
280 writer,
281 started: false,
282 finished: false,
283 format: F::default(),
284 options: self.0,
285 }
286 }
287}
288
289#[derive(Debug)]
300pub struct Writer<W, F>
301where
302 W: Write,
303 F: JsonFormat,
304{
305 writer: W,
307
308 started: bool,
310
311 finished: bool,
313
314 format: F,
316
317 options: EncoderOptions,
319}
320
321impl<W, F> Writer<W, F>
322where
323 W: Write,
324 F: JsonFormat,
325{
326 pub fn new(writer: W) -> Self {
328 Self {
329 writer,
330 started: false,
331 finished: false,
332 format: F::default(),
333 options: EncoderOptions::default(),
334 }
335 }
336
337 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
339 if batch.num_rows() == 0 {
340 return Ok(());
341 }
342
343 let mut buffer = Vec::with_capacity(16 * 1024);
346
347 let mut is_first_row = !self.started;
348 if !self.started {
349 self.format.start_stream(&mut buffer)?;
350 self.started = true;
351 }
352
353 let array = StructArray::from(batch.clone());
354 let mut encoder = make_encoder(&array, &self.options)?;
355
356 for idx in 0..batch.num_rows() {
357 self.format.start_row(&mut buffer, is_first_row)?;
358 is_first_row = false;
359
360 encoder.encode(idx, &mut buffer);
361 if buffer.len() > 8 * 1024 {
362 self.writer.write_all(&buffer)?;
363 buffer.clear();
364 }
365 self.format.end_row(&mut buffer)?;
366 }
367
368 if !buffer.is_empty() {
369 self.writer.write_all(&buffer)?;
370 }
371
372 Ok(())
373 }
374
375 pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
377 for b in batches {
378 self.write(b)?;
379 }
380 Ok(())
381 }
382
383 pub fn finish(&mut self) -> Result<(), ArrowError> {
387 if !self.started {
388 self.format.start_stream(&mut self.writer)?;
389 self.started = true;
390 }
391 if !self.finished {
392 self.format.end_stream(&mut self.writer)?;
393 self.finished = true;
394 }
395
396 Ok(())
397 }
398
399 pub fn into_inner(self) -> W {
401 self.writer
402 }
403}
404
405impl<W, F> RecordBatchWriter for Writer<W, F>
406where
407 W: Write,
408 F: JsonFormat,
409{
410 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
411 self.write(batch)
412 }
413
414 fn close(mut self) -> Result<(), ArrowError> {
415 self.finish()
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use core::str;
422 use std::fs::{read_to_string, File};
423 use std::io::{BufReader, Seek};
424 use std::sync::Arc;
425
426 use serde_json::{json, Value};
427
428 use arrow_array::builder::*;
429 use arrow_array::types::*;
430 use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ToByteSlice};
431 use arrow_data::ArrayData;
432
433 use crate::reader::*;
434
435 use super::*;
436
437 fn assert_json_eq(input: &[u8], expected: &str) {
439 let expected: Vec<Option<Value>> = expected
440 .split('\n')
441 .map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
442 .collect();
443
444 let actual: Vec<Option<Value>> = input
445 .split(|b| *b == b'\n')
446 .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
447 .collect();
448
449 assert_eq!(expected, actual);
450 }
451
452 #[test]
453 fn write_simple_rows() {
454 let schema = Schema::new(vec![
455 Field::new("c1", DataType::Int32, true),
456 Field::new("c2", DataType::Utf8, true),
457 ]);
458
459 let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]);
460 let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]);
461
462 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
463
464 let mut buf = Vec::new();
465 {
466 let mut writer = LineDelimitedWriter::new(&mut buf);
467 writer.write_batches(&[&batch]).unwrap();
468 }
469
470 assert_json_eq(
471 &buf,
472 r#"{"c1":1,"c2":"a"}
473{"c1":2,"c2":"b"}
474{"c1":3,"c2":"c"}
475{"c2":"d"}
476{"c1":5}
477"#,
478 );
479 }
480
481 #[test]
482 fn write_large_utf8_and_utf8_view() {
483 let schema = Schema::new(vec![
484 Field::new("c1", DataType::Utf8, true),
485 Field::new("c2", DataType::LargeUtf8, true),
486 Field::new("c3", DataType::Utf8View, true),
487 ]);
488
489 let a = StringArray::from(vec![Some("a"), None, Some("c"), Some("d"), None]);
490 let b = LargeStringArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
491 let c = StringViewArray::from(vec![Some("a"), Some("b"), None, Some("d"), None]);
492
493 let batch = RecordBatch::try_new(
494 Arc::new(schema),
495 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
496 )
497 .unwrap();
498
499 let mut buf = Vec::new();
500 {
501 let mut writer = LineDelimitedWriter::new(&mut buf);
502 writer.write_batches(&[&batch]).unwrap();
503 }
504
505 assert_json_eq(
506 &buf,
507 r#"{"c1":"a","c2":"a","c3":"a"}
508{"c2":"b","c3":"b"}
509{"c1":"c"}
510{"c1":"d","c2":"d","c3":"d"}
511{}
512"#,
513 );
514 }
515
516 #[test]
517 fn write_dictionary() {
518 let schema = Schema::new(vec![
519 Field::new_dictionary("c1", DataType::Int32, DataType::Utf8, true),
520 Field::new_dictionary("c2", DataType::Int8, DataType::Utf8, true),
521 ]);
522
523 let a: DictionaryArray<Int32Type> = vec![
524 Some("cupcakes"),
525 Some("foo"),
526 Some("foo"),
527 None,
528 Some("cupcakes"),
529 ]
530 .into_iter()
531 .collect();
532 let b: DictionaryArray<Int8Type> =
533 vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")]
534 .into_iter()
535 .collect();
536
537 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
538
539 let mut buf = Vec::new();
540 {
541 let mut writer = LineDelimitedWriter::new(&mut buf);
542 writer.write_batches(&[&batch]).unwrap();
543 }
544
545 assert_json_eq(
546 &buf,
547 r#"{"c1":"cupcakes","c2":"sdsd"}
548{"c1":"foo","c2":"sdsd"}
549{"c1":"foo"}
550{"c2":"sd"}
551{"c1":"cupcakes","c2":"sdsd"}
552"#,
553 );
554 }
555
556 #[test]
557 fn write_list_of_dictionary() {
558 let dict_field = Arc::new(Field::new_dictionary(
559 "item",
560 DataType::Int32,
561 DataType::Utf8,
562 true,
563 ));
564 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
565
566 let dict_array: DictionaryArray<Int32Type> =
567 vec![Some("a"), Some("b"), Some("c"), Some("a"), None, Some("c")]
568 .into_iter()
569 .collect();
570 let list_array = LargeListArray::try_new(
571 dict_field,
572 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
573 Arc::new(dict_array),
574 Some(NullBuffer::from_iter([true, true, false, true])),
575 )
576 .unwrap();
577
578 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
579
580 let mut buf = Vec::new();
581 {
582 let mut writer = LineDelimitedWriter::new(&mut buf);
583 writer.write_batches(&[&batch]).unwrap();
584 }
585
586 assert_json_eq(
587 &buf,
588 r#"{"l":["a","b","c"]}
589{"l":["a",null]}
590{}
591{"l":["c"]}
592"#,
593 );
594 }
595
596 #[test]
597 fn write_list_of_dictionary_large_values() {
598 let dict_field = Arc::new(Field::new_dictionary(
599 "item",
600 DataType::Int32,
601 DataType::LargeUtf8,
602 true,
603 ));
604 let schema = Schema::new(vec![Field::new_large_list("l", dict_field.clone(), true)]);
605
606 let keys = PrimitiveArray::<Int32Type>::from(vec![
607 Some(0),
608 Some(1),
609 Some(2),
610 Some(0),
611 None,
612 Some(2),
613 ]);
614 let values = LargeStringArray::from(vec!["a", "b", "c"]);
615 let dict_array = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
616
617 let list_array = LargeListArray::try_new(
618 dict_field,
619 OffsetBuffer::from_lengths([3_usize, 2, 0, 1]),
620 Arc::new(dict_array),
621 Some(NullBuffer::from_iter([true, true, false, true])),
622 )
623 .unwrap();
624
625 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
626
627 let mut buf = Vec::new();
628 {
629 let mut writer = LineDelimitedWriter::new(&mut buf);
630 writer.write_batches(&[&batch]).unwrap();
631 }
632
633 assert_json_eq(
634 &buf,
635 r#"{"l":["a","b","c"]}
636{"l":["a",null]}
637{}
638{"l":["c"]}
639"#,
640 );
641 }
642
643 #[test]
644 fn write_timestamps() {
645 let ts_string = "2018-11-13T17:11:10.011375885995";
646 let ts_nanos = ts_string
647 .parse::<chrono::NaiveDateTime>()
648 .unwrap()
649 .and_utc()
650 .timestamp_nanos_opt()
651 .unwrap();
652 let ts_micros = ts_nanos / 1000;
653 let ts_millis = ts_micros / 1000;
654 let ts_secs = ts_millis / 1000;
655
656 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
657 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
658 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
659 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
660 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
661
662 let schema = Schema::new(vec![
663 Field::new("nanos", arr_nanos.data_type().clone(), true),
664 Field::new("micros", arr_micros.data_type().clone(), true),
665 Field::new("millis", arr_millis.data_type().clone(), true),
666 Field::new("secs", arr_secs.data_type().clone(), true),
667 Field::new("name", arr_names.data_type().clone(), true),
668 ]);
669 let schema = Arc::new(schema);
670
671 let batch = RecordBatch::try_new(
672 schema,
673 vec![
674 Arc::new(arr_nanos),
675 Arc::new(arr_micros),
676 Arc::new(arr_millis),
677 Arc::new(arr_secs),
678 Arc::new(arr_names),
679 ],
680 )
681 .unwrap();
682
683 let mut buf = Vec::new();
684 {
685 let mut writer = LineDelimitedWriter::new(&mut buf);
686 writer.write_batches(&[&batch]).unwrap();
687 }
688
689 assert_json_eq(
690 &buf,
691 r#"{"micros":"2018-11-13T17:11:10.011375","millis":"2018-11-13T17:11:10.011","name":"a","nanos":"2018-11-13T17:11:10.011375885","secs":"2018-11-13T17:11:10"}
692{"name":"b"}
693"#,
694 );
695 }
696
697 #[test]
698 fn write_timestamps_with_tz() {
699 let ts_string = "2018-11-13T17:11:10.011375885995";
700 let ts_nanos = ts_string
701 .parse::<chrono::NaiveDateTime>()
702 .unwrap()
703 .and_utc()
704 .timestamp_nanos_opt()
705 .unwrap();
706 let ts_micros = ts_nanos / 1000;
707 let ts_millis = ts_micros / 1000;
708 let ts_secs = ts_millis / 1000;
709
710 let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
711 let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
712 let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
713 let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
714 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
715
716 let tz = "+00:00";
717
718 let arr_nanos = arr_nanos.with_timezone(tz);
719 let arr_micros = arr_micros.with_timezone(tz);
720 let arr_millis = arr_millis.with_timezone(tz);
721 let arr_secs = arr_secs.with_timezone(tz);
722
723 let schema = Schema::new(vec![
724 Field::new("nanos", arr_nanos.data_type().clone(), true),
725 Field::new("micros", arr_micros.data_type().clone(), true),
726 Field::new("millis", arr_millis.data_type().clone(), true),
727 Field::new("secs", arr_secs.data_type().clone(), true),
728 Field::new("name", arr_names.data_type().clone(), true),
729 ]);
730 let schema = Arc::new(schema);
731
732 let batch = RecordBatch::try_new(
733 schema,
734 vec![
735 Arc::new(arr_nanos),
736 Arc::new(arr_micros),
737 Arc::new(arr_millis),
738 Arc::new(arr_secs),
739 Arc::new(arr_names),
740 ],
741 )
742 .unwrap();
743
744 let mut buf = Vec::new();
745 {
746 let mut writer = LineDelimitedWriter::new(&mut buf);
747 writer.write_batches(&[&batch]).unwrap();
748 }
749
750 assert_json_eq(
751 &buf,
752 r#"{"micros":"2018-11-13T17:11:10.011375Z","millis":"2018-11-13T17:11:10.011Z","name":"a","nanos":"2018-11-13T17:11:10.011375885Z","secs":"2018-11-13T17:11:10Z"}
753{"name":"b"}
754"#,
755 );
756 }
757
758 #[test]
759 fn write_dates() {
760 let ts_string = "2018-11-13T17:11:10.011375885995";
761 let ts_millis = ts_string
762 .parse::<chrono::NaiveDateTime>()
763 .unwrap()
764 .and_utc()
765 .timestamp_millis();
766
767 let arr_date32 = Date32Array::from(vec![
768 Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
769 None,
770 ]);
771 let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
772 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
773
774 let schema = Schema::new(vec![
775 Field::new("date32", arr_date32.data_type().clone(), true),
776 Field::new("date64", arr_date64.data_type().clone(), true),
777 Field::new("name", arr_names.data_type().clone(), false),
778 ]);
779 let schema = Arc::new(schema);
780
781 let batch = RecordBatch::try_new(
782 schema,
783 vec![
784 Arc::new(arr_date32),
785 Arc::new(arr_date64),
786 Arc::new(arr_names),
787 ],
788 )
789 .unwrap();
790
791 let mut buf = Vec::new();
792 {
793 let mut writer = LineDelimitedWriter::new(&mut buf);
794 writer.write_batches(&[&batch]).unwrap();
795 }
796
797 assert_json_eq(
798 &buf,
799 r#"{"date32":"2018-11-13","date64":"2018-11-13T17:11:10.011","name":"a"}
800{"name":"b"}
801"#,
802 );
803 }
804
805 #[test]
806 fn write_times() {
807 let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);
808 let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]);
809 let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]);
810 let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]);
811 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
812
813 let schema = Schema::new(vec![
814 Field::new("time32sec", arr_time32sec.data_type().clone(), true),
815 Field::new("time32msec", arr_time32msec.data_type().clone(), true),
816 Field::new("time64usec", arr_time64usec.data_type().clone(), true),
817 Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
818 Field::new("name", arr_names.data_type().clone(), true),
819 ]);
820 let schema = Arc::new(schema);
821
822 let batch = RecordBatch::try_new(
823 schema,
824 vec![
825 Arc::new(arr_time32sec),
826 Arc::new(arr_time32msec),
827 Arc::new(arr_time64usec),
828 Arc::new(arr_time64nsec),
829 Arc::new(arr_names),
830 ],
831 )
832 .unwrap();
833
834 let mut buf = Vec::new();
835 {
836 let mut writer = LineDelimitedWriter::new(&mut buf);
837 writer.write_batches(&[&batch]).unwrap();
838 }
839
840 assert_json_eq(
841 &buf,
842 r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"}
843{"name":"b"}
844"#,
845 );
846 }
847
848 #[test]
849 fn write_durations() {
850 let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]);
851 let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]);
852 let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]);
853 let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]);
854 let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
855
856 let schema = Schema::new(vec![
857 Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
858 Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
859 Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
860 Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
861 Field::new("name", arr_names.data_type().clone(), true),
862 ]);
863 let schema = Arc::new(schema);
864
865 let batch = RecordBatch::try_new(
866 schema,
867 vec![
868 Arc::new(arr_durationsec),
869 Arc::new(arr_durationmsec),
870 Arc::new(arr_durationusec),
871 Arc::new(arr_durationnsec),
872 Arc::new(arr_names),
873 ],
874 )
875 .unwrap();
876
877 let mut buf = Vec::new();
878 {
879 let mut writer = LineDelimitedWriter::new(&mut buf);
880 writer.write_batches(&[&batch]).unwrap();
881 }
882
883 assert_json_eq(
884 &buf,
885 r#"{"duration_sec":"PT120S","duration_msec":"PT0.12S","duration_usec":"PT0.00012S","duration_nsec":"PT0.00000012S","name":"a"}
886{"name":"b"}
887"#,
888 );
889 }
890
891 #[test]
892 fn write_nested_structs() {
893 let schema = Schema::new(vec![
894 Field::new(
895 "c1",
896 DataType::Struct(Fields::from(vec![
897 Field::new("c11", DataType::Int32, true),
898 Field::new(
899 "c12",
900 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
901 false,
902 ),
903 ])),
904 false,
905 ),
906 Field::new("c2", DataType::Utf8, false),
907 ]);
908
909 let c1 = StructArray::from(vec![
910 (
911 Arc::new(Field::new("c11", DataType::Int32, true)),
912 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
913 ),
914 (
915 Arc::new(Field::new(
916 "c12",
917 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
918 false,
919 )),
920 Arc::new(StructArray::from(vec![(
921 Arc::new(Field::new("c121", DataType::Utf8, false)),
922 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
923 )])) as ArrayRef,
924 ),
925 ]);
926 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
927
928 let batch =
929 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
930
931 let mut buf = Vec::new();
932 {
933 let mut writer = LineDelimitedWriter::new(&mut buf);
934 writer.write_batches(&[&batch]).unwrap();
935 }
936
937 assert_json_eq(
938 &buf,
939 r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
940{"c1":{"c12":{"c121":"f"}},"c2":"b"}
941{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
942"#,
943 );
944 }
945
946 #[test]
947 fn write_struct_with_list_field() {
948 let field_c1 = Field::new(
949 "c1",
950 DataType::List(Arc::new(Field::new("c_list", DataType::Utf8, false))),
951 false,
952 );
953 let field_c2 = Field::new("c2", DataType::Int32, false);
954 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
955
956 let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]);
957 let a_value_offsets = Buffer::from([0, 2, 3, 4, 5, 6].to_byte_slice());
959 let a_list_data = ArrayData::builder(field_c1.data_type().clone())
960 .len(5)
961 .add_buffer(a_value_offsets)
962 .add_child_data(a_values.into_data())
963 .null_bit_buffer(Some(Buffer::from([0b00011111])))
964 .build()
965 .unwrap();
966 let a = ListArray::from(a_list_data);
967
968 let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
969
970 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
971
972 let mut buf = Vec::new();
973 {
974 let mut writer = LineDelimitedWriter::new(&mut buf);
975 writer.write_batches(&[&batch]).unwrap();
976 }
977
978 assert_json_eq(
979 &buf,
980 r#"{"c1":["a","a1"],"c2":1}
981{"c1":["b"],"c2":2}
982{"c1":["c"],"c2":3}
983{"c1":["d"],"c2":4}
984{"c1":["e"],"c2":5}
985"#,
986 );
987 }
988
989 #[test]
990 fn write_nested_list() {
991 let list_inner_type = Field::new(
992 "a",
993 DataType::List(Arc::new(Field::new("b", DataType::Int32, false))),
994 false,
995 );
996 let field_c1 = Field::new(
997 "c1",
998 DataType::List(Arc::new(list_inner_type.clone())),
999 false,
1000 );
1001 let field_c2 = Field::new("c2", DataType::Utf8, true);
1002 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1003
1004 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1006
1007 let a_value_offsets = Buffer::from([0, 2, 3, 6].to_byte_slice());
1008 let a_list_data = ArrayData::builder(list_inner_type.data_type().clone())
1010 .len(3)
1011 .add_buffer(a_value_offsets)
1012 .null_bit_buffer(Some(Buffer::from([0b00000111])))
1013 .add_child_data(a_values.into_data())
1014 .build()
1015 .unwrap();
1016
1017 let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1018 let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1019 .len(3)
1020 .add_buffer(c1_value_offsets)
1021 .add_child_data(a_list_data)
1022 .build()
1023 .unwrap();
1024
1025 let c1 = ListArray::from(c1_list_data);
1026 let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]);
1027
1028 let batch =
1029 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1030
1031 let mut buf = Vec::new();
1032 {
1033 let mut writer = LineDelimitedWriter::new(&mut buf);
1034 writer.write_batches(&[&batch]).unwrap();
1035 }
1036
1037 assert_json_eq(
1038 &buf,
1039 r#"{"c1":[[1,2],[3]],"c2":"foo"}
1040{"c1":[],"c2":"bar"}
1041{"c1":[[4,5,6]]}
1042"#,
1043 );
1044 }
1045
1046 #[test]
1047 fn write_list_of_struct() {
1048 let field_c1 = Field::new(
1049 "c1",
1050 DataType::List(Arc::new(Field::new(
1051 "s",
1052 DataType::Struct(Fields::from(vec![
1053 Field::new("c11", DataType::Int32, true),
1054 Field::new(
1055 "c12",
1056 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1057 false,
1058 ),
1059 ])),
1060 false,
1061 ))),
1062 true,
1063 );
1064 let field_c2 = Field::new("c2", DataType::Int32, false);
1065 let schema = Schema::new(vec![field_c1.clone(), field_c2]);
1066
1067 let struct_values = StructArray::from(vec![
1068 (
1069 Arc::new(Field::new("c11", DataType::Int32, true)),
1070 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1071 ),
1072 (
1073 Arc::new(Field::new(
1074 "c12",
1075 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1076 false,
1077 )),
1078 Arc::new(StructArray::from(vec![(
1079 Arc::new(Field::new("c121", DataType::Utf8, false)),
1080 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
1081 )])) as ArrayRef,
1082 ),
1083 ]);
1084
1085 let c1_value_offsets = Buffer::from([0, 2, 2, 3].to_byte_slice());
1090 let c1_list_data = ArrayData::builder(field_c1.data_type().clone())
1091 .len(3)
1092 .add_buffer(c1_value_offsets)
1093 .add_child_data(struct_values.into_data())
1094 .null_bit_buffer(Some(Buffer::from([0b00000101])))
1095 .build()
1096 .unwrap();
1097 let c1 = ListArray::from(c1_list_data);
1098
1099 let c2 = Int32Array::from(vec![1, 2, 3]);
1100
1101 let batch =
1102 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1103
1104 let mut buf = Vec::new();
1105 {
1106 let mut writer = LineDelimitedWriter::new(&mut buf);
1107 writer.write_batches(&[&batch]).unwrap();
1108 }
1109
1110 assert_json_eq(
1111 &buf,
1112 r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1}
1113{"c2":2}
1114{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3}
1115"#,
1116 );
1117 }
1118
1119 fn test_write_for_file(test_file: &str, remove_nulls: bool) {
1120 let file = File::open(test_file).unwrap();
1121 let mut reader = BufReader::new(file);
1122 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1123 reader.rewind().unwrap();
1124
1125 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1126 let mut reader = builder.build(reader).unwrap();
1127 let batch = reader.next().unwrap().unwrap();
1128
1129 let mut buf = Vec::new();
1130 {
1131 if remove_nulls {
1132 let mut writer = LineDelimitedWriter::new(&mut buf);
1133 writer.write_batches(&[&batch]).unwrap();
1134 } else {
1135 let mut writer = WriterBuilder::new()
1136 .with_explicit_nulls(true)
1137 .build::<_, LineDelimited>(&mut buf);
1138 writer.write_batches(&[&batch]).unwrap();
1139 }
1140 }
1141
1142 let result = str::from_utf8(&buf).unwrap();
1143 let expected = read_to_string(test_file).unwrap();
1144 for (r, e) in result.lines().zip(expected.lines()) {
1145 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1146 if remove_nulls {
1147 if let Value::Object(obj) = expected_json {
1149 expected_json =
1150 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1151 }
1152 }
1153 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1154 }
1155 }
1156
1157 #[test]
1158 fn write_basic_rows() {
1159 test_write_for_file("test/data/basic.json", true);
1160 }
1161
1162 #[test]
1163 fn write_arrays() {
1164 test_write_for_file("test/data/arrays.json", true);
1165 }
1166
1167 #[test]
1168 fn write_basic_nulls() {
1169 test_write_for_file("test/data/basic_nulls.json", true);
1170 }
1171
1172 #[test]
1173 fn write_nested_with_nulls() {
1174 test_write_for_file("test/data/nested_with_nulls.json", false);
1175 }
1176
1177 #[test]
1178 fn json_line_writer_empty() {
1179 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1180 writer.finish().unwrap();
1181 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1182 }
1183
1184 #[test]
1185 fn json_array_writer_empty() {
1186 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1187 writer.finish().unwrap();
1188 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1189 }
1190
1191 #[test]
1192 fn json_line_writer_empty_batch() {
1193 let mut writer = LineDelimitedWriter::new(vec![] as Vec<u8>);
1194
1195 let array = Int32Array::from(Vec::<i32>::new());
1196 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1197 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1198
1199 writer.write(&batch).unwrap();
1200 writer.finish().unwrap();
1201 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "");
1202 }
1203
1204 #[test]
1205 fn json_array_writer_empty_batch() {
1206 let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
1207
1208 let array = Int32Array::from(Vec::<i32>::new());
1209 let schema = Schema::new(vec![Field::new("c", DataType::Int32, true)]);
1210 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1211
1212 writer.write(&batch).unwrap();
1213 writer.finish().unwrap();
1214 assert_eq!(str::from_utf8(&writer.into_inner()).unwrap(), "[]");
1215 }
1216
1217 #[test]
1218 fn json_struct_array_nulls() {
1219 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1220 Some(vec![Some(1), Some(2)]),
1221 Some(vec![None]),
1222 Some(vec![]),
1223 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1225 None, None,
1227 ]);
1228
1229 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1230 let array = Arc::new(inner) as ArrayRef;
1231 let struct_array_a = StructArray::from((
1232 vec![(field.clone(), array.clone())],
1233 Buffer::from([0b01010111]),
1234 ));
1235 let struct_array_b = StructArray::from(vec![(field, array)]);
1236
1237 let schema = Schema::new(vec![
1238 Field::new_struct("a", struct_array_a.fields().clone(), true),
1239 Field::new_struct("b", struct_array_b.fields().clone(), true),
1240 ]);
1241
1242 let batch = RecordBatch::try_new(
1243 Arc::new(schema),
1244 vec![Arc::new(struct_array_a), Arc::new(struct_array_b)],
1245 )
1246 .unwrap();
1247
1248 let mut buf = Vec::new();
1249 {
1250 let mut writer = LineDelimitedWriter::new(&mut buf);
1251 writer.write_batches(&[&batch]).unwrap();
1252 }
1253
1254 assert_json_eq(
1255 &buf,
1256 r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
1257{"a":{"list":[null]},"b":{"list":[null]}}
1258{"a":{"list":[]},"b":{"list":[]}}
1259{"b":{"list":[3,null]}}
1260{"a":{"list":[4,5]},"b":{"list":[4,5]}}
1261{"b":{}}
1262{"a":{},"b":{}}
1263"#,
1264 );
1265 }
1266
1267 #[test]
1268 fn json_writer_map() {
1269 let keys_array = super::StringArray::from(vec!["foo", "bar", "baz", "qux", "quux"]);
1270 let values_array = super::Int64Array::from(vec![10, 20, 30, 40, 50]);
1271
1272 let keys = Arc::new(Field::new("keys", DataType::Utf8, false));
1273 let values = Arc::new(Field::new("values", DataType::Int64, false));
1274 let entry_struct = StructArray::from(vec![
1275 (keys, Arc::new(keys_array) as ArrayRef),
1276 (values, Arc::new(values_array) as ArrayRef),
1277 ]);
1278
1279 let map_data_type = DataType::Map(
1280 Arc::new(Field::new(
1281 "entries",
1282 entry_struct.data_type().clone(),
1283 false,
1284 )),
1285 false,
1286 );
1287
1288 let entry_offsets = Buffer::from([0, 1, 1, 1, 4, 5, 5].to_byte_slice());
1290 let valid_buffer = Buffer::from([0b00111101]);
1291
1292 let map_data = ArrayData::builder(map_data_type.clone())
1293 .len(6)
1294 .null_bit_buffer(Some(valid_buffer))
1295 .add_buffer(entry_offsets)
1296 .add_child_data(entry_struct.into_data())
1297 .build()
1298 .unwrap();
1299
1300 let map = MapArray::from(map_data);
1301
1302 let map_field = Field::new("map", map_data_type, true);
1303 let schema = Arc::new(Schema::new(vec![map_field]));
1304
1305 let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
1306
1307 let mut buf = Vec::new();
1308 {
1309 let mut writer = LineDelimitedWriter::new(&mut buf);
1310 writer.write_batches(&[&batch]).unwrap();
1311 }
1312
1313 assert_json_eq(
1314 &buf,
1315 r#"{"map":{"foo":10}}
1316{}
1317{"map":{}}
1318{"map":{"bar":20,"baz":30,"qux":40}}
1319{"map":{"quux":50}}
1320{"map":{}}
1321"#,
1322 );
1323 }
1324
1325 #[test]
1326 fn test_write_single_batch() {
1327 let test_file = "test/data/basic.json";
1328 let file = File::open(test_file).unwrap();
1329 let mut reader = BufReader::new(file);
1330 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1331 reader.rewind().unwrap();
1332
1333 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(1024);
1334 let mut reader = builder.build(reader).unwrap();
1335 let batch = reader.next().unwrap().unwrap();
1336
1337 let mut buf = Vec::new();
1338 {
1339 let mut writer = LineDelimitedWriter::new(&mut buf);
1340 writer.write(&batch).unwrap();
1341 }
1342
1343 let result = str::from_utf8(&buf).unwrap();
1344 let expected = read_to_string(test_file).unwrap();
1345 for (r, e) in result.lines().zip(expected.lines()) {
1346 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1347 if let Value::Object(obj) = expected_json {
1349 expected_json =
1350 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1351 }
1352 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1353 }
1354 }
1355
1356 #[test]
1357 fn test_write_multi_batches() {
1358 let test_file = "test/data/basic.json";
1359
1360 let schema = SchemaRef::new(Schema::new(vec![
1361 Field::new("a", DataType::Int64, true),
1362 Field::new("b", DataType::Float64, true),
1363 Field::new("c", DataType::Boolean, true),
1364 Field::new("d", DataType::Utf8, true),
1365 Field::new("e", DataType::Utf8, true),
1366 Field::new("f", DataType::Utf8, true),
1367 Field::new("g", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1368 Field::new("h", DataType::Float16, true),
1369 ]));
1370
1371 let mut reader = ReaderBuilder::new(schema.clone())
1372 .build(BufReader::new(File::open(test_file).unwrap()))
1373 .unwrap();
1374 let batch = reader.next().unwrap().unwrap();
1375
1376 let batches = [&RecordBatch::new_empty(schema), &batch, &batch];
1378
1379 let mut buf = Vec::new();
1380 {
1381 let mut writer = LineDelimitedWriter::new(&mut buf);
1382 writer.write_batches(&batches).unwrap();
1383 }
1384
1385 let result = str::from_utf8(&buf).unwrap();
1386 let expected = read_to_string(test_file).unwrap();
1387 let expected = format!("{expected}\n{expected}");
1389 for (r, e) in result.lines().zip(expected.lines()) {
1390 let mut expected_json = serde_json::from_str::<Value>(e).unwrap();
1391 if let Value::Object(obj) = expected_json {
1393 expected_json =
1394 Value::Object(obj.into_iter().filter(|(_, v)| *v != Value::Null).collect());
1395 }
1396 assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,);
1397 }
1398 }
1399
1400 #[test]
1401 fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
1402 fn nested_list() -> (Arc<ListArray>, Arc<Field>) {
1403 let array = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1404 Some(vec![None, None, None]),
1405 Some(vec![Some(1), Some(2), Some(3)]),
1406 None,
1407 Some(vec![None, None, None]),
1408 ]));
1409 let field = Arc::new(Field::new("list", array.data_type().clone(), true));
1410 (array, field)
1412 }
1413
1414 fn nested_dict() -> (Arc<DictionaryArray<Int32Type>>, Arc<Field>) {
1415 let array = Arc::new(DictionaryArray::from_iter(vec![
1416 Some("cupcakes"),
1417 None,
1418 Some("bear"),
1419 Some("kuma"),
1420 ]));
1421 let field = Arc::new(Field::new("dict", array.data_type().clone(), true));
1422 (array, field)
1424 }
1425
1426 fn nested_map() -> (Arc<MapArray>, Arc<Field>) {
1427 let string_builder = StringBuilder::new();
1428 let int_builder = Int64Builder::new();
1429 let mut builder = MapBuilder::new(None, string_builder, int_builder);
1430
1431 builder.keys().append_value("foo");
1433 builder.values().append_value(10);
1434 builder.append(true).unwrap();
1435
1436 builder.append(false).unwrap();
1437
1438 builder.append(true).unwrap();
1439
1440 builder.keys().append_value("bar");
1441 builder.values().append_value(20);
1442 builder.keys().append_value("baz");
1443 builder.values().append_value(30);
1444 builder.keys().append_value("qux");
1445 builder.values().append_value(40);
1446 builder.append(true).unwrap();
1447
1448 let array = Arc::new(builder.finish());
1449 let field = Arc::new(Field::new("map", array.data_type().clone(), true));
1450 (array, field)
1451 }
1452
1453 fn root_list() -> (Arc<ListArray>, Field) {
1454 let struct_array = StructArray::from(vec![
1455 (
1456 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1457 Arc::new(StringArray::from(vec![Some("a"), Some("b"), None, None])) as ArrayRef,
1458 ),
1459 (
1460 Arc::new(Field::new("int32", DataType::Int32, true)),
1461 Arc::new(Int32Array::from(vec![Some(1), None, Some(5), None])) as ArrayRef,
1462 ),
1463 ]);
1464
1465 let field = Field::new_list(
1466 "list",
1467 Field::new("struct", struct_array.data_type().clone(), true),
1468 true,
1469 );
1470
1471 let entry_offsets = Buffer::from([0, 2, 2, 3, 3].to_byte_slice());
1473 let data = ArrayData::builder(field.data_type().clone())
1474 .len(4)
1475 .add_buffer(entry_offsets)
1476 .add_child_data(struct_array.into_data())
1477 .null_bit_buffer(Some([0b00000101].into()))
1478 .build()
1479 .unwrap();
1480 let array = Arc::new(ListArray::from(data));
1481 (array, field)
1482 }
1483
1484 let (nested_list_array, nested_list_field) = nested_list();
1485 let (nested_dict_array, nested_dict_field) = nested_dict();
1486 let (nested_map_array, nested_map_field) = nested_map();
1487 let (root_list_array, root_list_field) = root_list();
1488
1489 let schema = Schema::new(vec![
1490 Field::new("date", DataType::Date32, true),
1491 Field::new("null", DataType::Null, true),
1492 Field::new_struct(
1493 "struct",
1494 vec![
1495 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1496 nested_list_field.clone(),
1497 nested_dict_field.clone(),
1498 nested_map_field.clone(),
1499 ],
1500 true,
1501 ),
1502 root_list_field,
1503 ]);
1504
1505 let arr_date32 = Date32Array::from(vec![Some(0), None, Some(1), None]);
1506 let arr_null = NullArray::new(4);
1507 let arr_struct = StructArray::from(vec![
1508 (
1510 Arc::new(Field::new("utf8", DataType::Utf8, true)),
1511 Arc::new(StringArray::from(vec![Some("a"), None, None, Some("b")])) as ArrayRef,
1512 ),
1513 (nested_list_field, nested_list_array as ArrayRef),
1515 (nested_dict_field, nested_dict_array as ArrayRef),
1517 (nested_map_field, nested_map_array as ArrayRef),
1519 ]);
1520
1521 let batch = RecordBatch::try_new(
1522 Arc::new(schema),
1523 vec![
1524 Arc::new(arr_date32),
1526 Arc::new(arr_null),
1528 Arc::new(arr_struct),
1529 root_list_array,
1531 ],
1532 )?;
1533
1534 let mut buf = Vec::new();
1535 {
1536 let mut writer = WriterBuilder::new()
1537 .with_explicit_nulls(true)
1538 .build::<_, JsonArray>(&mut buf);
1539 writer.write_batches(&[&batch])?;
1540 writer.finish()?;
1541 }
1542
1543 let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
1544 let expected = serde_json::from_value::<Vec<Value>>(json!([
1545 {
1546 "date": "1970-01-01",
1547 "list": [
1548 {
1549 "int32": 1,
1550 "utf8": "a"
1551 },
1552 {
1553 "int32": null,
1554 "utf8": "b"
1555 }
1556 ],
1557 "null": null,
1558 "struct": {
1559 "dict": "cupcakes",
1560 "list": [
1561 null,
1562 null,
1563 null
1564 ],
1565 "map": {
1566 "foo": 10
1567 },
1568 "utf8": "a"
1569 }
1570 },
1571 {
1572 "date": null,
1573 "list": null,
1574 "null": null,
1575 "struct": {
1576 "dict": null,
1577 "list": [
1578 1,
1579 2,
1580 3
1581 ],
1582 "map": null,
1583 "utf8": null
1584 }
1585 },
1586 {
1587 "date": "1970-01-02",
1588 "list": [
1589 {
1590 "int32": 5,
1591 "utf8": null
1592 }
1593 ],
1594 "null": null,
1595 "struct": {
1596 "dict": "bear",
1597 "list": null,
1598 "map": {},
1599 "utf8": null
1600 }
1601 },
1602 {
1603 "date": null,
1604 "list": null,
1605 "null": null,
1606 "struct": {
1607 "dict": "kuma",
1608 "list": [
1609 null,
1610 null,
1611 null
1612 ],
1613 "map": {
1614 "bar": 20,
1615 "baz": 30,
1616 "qux": 40
1617 },
1618 "utf8": "b"
1619 }
1620 }
1621 ]))
1622 .unwrap();
1623
1624 assert_eq!(actual, expected);
1625
1626 Ok(())
1627 }
1628
1629 fn binary_encoding_test<O: OffsetSizeTrait>() {
1630 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1632 "bytes",
1633 GenericBinaryType::<O>::DATA_TYPE,
1634 true,
1635 )]));
1636
1637 let mut builder = GenericByteBuilder::<GenericBinaryType<O>>::new();
1639 let values = [Some(b"Ned Flanders"), None, Some(b"Troy McClure")];
1640 for value in values {
1641 match value {
1642 Some(v) => builder.append_value(v),
1643 None => builder.append_null(),
1644 }
1645 }
1646 let array = Arc::new(builder.finish()) as ArrayRef;
1647 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1648
1649 {
1651 let mut buf = Vec::new();
1652 let json_value: Value = {
1653 let mut writer = WriterBuilder::new()
1654 .with_explicit_nulls(true)
1655 .build::<_, JsonArray>(&mut buf);
1656 writer.write(&batch).unwrap();
1657 writer.close().unwrap();
1658 serde_json::from_slice(&buf).unwrap()
1659 };
1660
1661 assert_eq!(
1662 json!([
1663 {
1664 "bytes": "4e656420466c616e64657273"
1665 },
1666 {
1667 "bytes": null },
1669 {
1670 "bytes": "54726f79204d63436c757265"
1671 }
1672 ]),
1673 json_value,
1674 );
1675 }
1676
1677 {
1679 let mut buf = Vec::new();
1680 let json_value: Value = {
1681 let mut writer = ArrayWriter::new(&mut buf);
1684 writer.write(&batch).unwrap();
1685 writer.close().unwrap();
1686 serde_json::from_slice(&buf).unwrap()
1687 };
1688
1689 assert_eq!(
1690 json!([
1691 {
1692 "bytes": "4e656420466c616e64657273"
1693 },
1694 {}, {
1696 "bytes": "54726f79204d63436c757265"
1697 }
1698 ]),
1699 json_value
1700 );
1701 }
1702 }
1703
1704 #[test]
1705 fn test_writer_binary() {
1706 binary_encoding_test::<i32>();
1708 binary_encoding_test::<i64>();
1710 }
1711
1712 #[test]
1713 fn test_writer_fixed_size_binary() {
1714 let size = 11;
1716 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1717 "bytes",
1718 DataType::FixedSizeBinary(size),
1719 true,
1720 )]));
1721
1722 let mut builder = FixedSizeBinaryBuilder::new(size);
1724 let values = [Some(b"hello world"), None, Some(b"summer rain")];
1725 for value in values {
1726 match value {
1727 Some(v) => builder.append_value(v).unwrap(),
1728 None => builder.append_null(),
1729 }
1730 }
1731 let array = Arc::new(builder.finish()) as ArrayRef;
1732 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1733
1734 {
1736 let mut buf = Vec::new();
1737 let json_value: Value = {
1738 let mut writer = WriterBuilder::new()
1739 .with_explicit_nulls(true)
1740 .build::<_, JsonArray>(&mut buf);
1741 writer.write(&batch).unwrap();
1742 writer.close().unwrap();
1743 serde_json::from_slice(&buf).unwrap()
1744 };
1745
1746 assert_eq!(
1747 json!([
1748 {
1749 "bytes": "68656c6c6f20776f726c64"
1750 },
1751 {
1752 "bytes": null },
1754 {
1755 "bytes": "73756d6d6572207261696e"
1756 }
1757 ]),
1758 json_value,
1759 );
1760 }
1761 {
1763 let mut buf = Vec::new();
1764 let json_value: Value = {
1765 let mut writer = ArrayWriter::new(&mut buf);
1768 writer.write(&batch).unwrap();
1769 writer.close().unwrap();
1770 serde_json::from_slice(&buf).unwrap()
1771 };
1772
1773 assert_eq!(
1774 json!([
1775 {
1776 "bytes": "68656c6c6f20776f726c64"
1777 },
1778 {}, {
1780 "bytes": "73756d6d6572207261696e"
1781 }
1782 ]),
1783 json_value,
1784 );
1785 }
1786 }
1787
1788 #[test]
1789 fn test_writer_fixed_size_list() {
1790 let size = 3;
1791 let field = FieldRef::new(Field::new_list_field(DataType::Int32, true));
1792 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1793 "list",
1794 DataType::FixedSizeList(field, size),
1795 true,
1796 )]));
1797
1798 let values_builder = Int32Builder::new();
1799 let mut list_builder = FixedSizeListBuilder::new(values_builder, size);
1800 let lists = [
1801 Some([Some(1), Some(2), None]),
1802 Some([Some(3), None, Some(4)]),
1803 Some([None, Some(5), Some(6)]),
1804 None,
1805 ];
1806 for list in lists {
1807 match list {
1808 Some(l) => {
1809 for value in l {
1810 match value {
1811 Some(v) => list_builder.values().append_value(v),
1812 None => list_builder.values().append_null(),
1813 }
1814 }
1815 list_builder.append(true);
1816 }
1817 None => {
1818 for _ in 0..size {
1819 list_builder.values().append_null();
1820 }
1821 list_builder.append(false);
1822 }
1823 }
1824 }
1825 let array = Arc::new(list_builder.finish()) as ArrayRef;
1826 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1827
1828 {
1830 let json_value: Value = {
1831 let mut buf = Vec::new();
1832 let mut writer = WriterBuilder::new()
1833 .with_explicit_nulls(true)
1834 .build::<_, JsonArray>(&mut buf);
1835 writer.write(&batch).unwrap();
1836 writer.close().unwrap();
1837 serde_json::from_slice(&buf).unwrap()
1838 };
1839 assert_eq!(
1840 json!([
1841 {"list": [1, 2, null]},
1842 {"list": [3, null, 4]},
1843 {"list": [null, 5, 6]},
1844 {"list": null},
1845 ]),
1846 json_value
1847 );
1848 }
1849 {
1851 let json_value: Value = {
1852 let mut buf = Vec::new();
1853 let mut writer = ArrayWriter::new(&mut buf);
1854 writer.write(&batch).unwrap();
1855 writer.close().unwrap();
1856 serde_json::from_slice(&buf).unwrap()
1857 };
1858 assert_eq!(
1859 json!([
1860 {"list": [1, 2, null]},
1861 {"list": [3, null, 4]},
1862 {"list": [null, 5, 6]},
1863 {}, ]),
1865 json_value
1866 );
1867 }
1868 }
1869
1870 #[test]
1871 fn test_writer_null_dict() {
1872 let keys = Int32Array::from_iter(vec![Some(0), None, Some(1)]);
1873 let values = Arc::new(StringArray::from_iter(vec![Some("a"), None]));
1874 let dict = DictionaryArray::new(keys, values);
1875
1876 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1877 "my_dict",
1878 DataType::Dictionary(DataType::Int32.into(), DataType::Utf8.into()),
1879 true,
1880 )]));
1881
1882 let array = Arc::new(dict) as ArrayRef;
1883 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1884
1885 let mut json = Vec::new();
1886 let write_builder = WriterBuilder::new().with_explicit_nulls(true);
1887 let mut writer = write_builder.build::<_, JsonArray>(&mut json);
1888 writer.write(&batch).unwrap();
1889 writer.close().unwrap();
1890
1891 let json_str = str::from_utf8(&json).unwrap();
1892 assert_eq!(
1893 json_str,
1894 r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":null}]"#
1895 )
1896 }
1897
1898 #[test]
1899 fn test_decimal128_encoder() {
1900 let array = Decimal128Array::from_iter_values([1234, 5678, 9012])
1901 .with_precision_and_scale(10, 2)
1902 .unwrap();
1903 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1904 let schema = Schema::new(vec![field]);
1905 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1906
1907 let mut buf = Vec::new();
1908 {
1909 let mut writer = LineDelimitedWriter::new(&mut buf);
1910 writer.write_batches(&[&batch]).unwrap();
1911 }
1912
1913 assert_json_eq(
1914 &buf,
1915 r#"{"decimal":12.34}
1916{"decimal":56.78}
1917{"decimal":90.12}
1918"#,
1919 );
1920 }
1921
1922 #[test]
1923 fn test_decimal256_encoder() {
1924 let array = Decimal256Array::from_iter_values([
1925 i256::from(123400),
1926 i256::from(567800),
1927 i256::from(901200),
1928 ])
1929 .with_precision_and_scale(10, 4)
1930 .unwrap();
1931 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1932 let schema = Schema::new(vec![field]);
1933 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1934
1935 let mut buf = Vec::new();
1936 {
1937 let mut writer = LineDelimitedWriter::new(&mut buf);
1938 writer.write_batches(&[&batch]).unwrap();
1939 }
1940
1941 assert_json_eq(
1942 &buf,
1943 r#"{"decimal":12.3400}
1944{"decimal":56.7800}
1945{"decimal":90.1200}
1946"#,
1947 );
1948 }
1949
1950 #[test]
1951 fn test_decimal_encoder_with_nulls() {
1952 let array = Decimal128Array::from_iter([Some(1234), None, Some(5678)])
1953 .with_precision_and_scale(10, 2)
1954 .unwrap();
1955 let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1956 let schema = Schema::new(vec![field]);
1957 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1958
1959 let mut buf = Vec::new();
1960 {
1961 let mut writer = LineDelimitedWriter::new(&mut buf);
1962 writer.write_batches(&[&batch]).unwrap();
1963 }
1964
1965 assert_json_eq(
1966 &buf,
1967 r#"{"decimal":12.34}
1968{}
1969{"decimal":56.78}
1970"#,
1971 );
1972 }
1973
1974 #[test]
1975 fn write_structs_as_list() {
1976 let schema = Schema::new(vec![
1977 Field::new(
1978 "c1",
1979 DataType::Struct(Fields::from(vec![
1980 Field::new("c11", DataType::Int32, true),
1981 Field::new(
1982 "c12",
1983 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
1984 false,
1985 ),
1986 ])),
1987 false,
1988 ),
1989 Field::new("c2", DataType::Utf8, false),
1990 ]);
1991
1992 let c1 = StructArray::from(vec![
1993 (
1994 Arc::new(Field::new("c11", DataType::Int32, true)),
1995 Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef,
1996 ),
1997 (
1998 Arc::new(Field::new(
1999 "c12",
2000 DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()),
2001 false,
2002 )),
2003 Arc::new(StructArray::from(vec![(
2004 Arc::new(Field::new("c121", DataType::Utf8, false)),
2005 Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef,
2006 )])) as ArrayRef,
2007 ),
2008 ]);
2009 let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
2010
2011 let batch =
2012 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
2013
2014 let expected = r#"[[1,["e"]],"a"]
2015[[null,["f"]],"b"]
2016[[5,["g"]],"c"]
2017"#;
2018
2019 let mut buf = Vec::new();
2020 {
2021 let builder = WriterBuilder::new()
2022 .with_explicit_nulls(true)
2023 .with_struct_mode(StructMode::ListOnly);
2024 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2025 writer.write_batches(&[&batch]).unwrap();
2026 }
2027 assert_json_eq(&buf, expected);
2028
2029 let mut buf = Vec::new();
2030 {
2031 let builder = WriterBuilder::new()
2032 .with_explicit_nulls(false)
2033 .with_struct_mode(StructMode::ListOnly);
2034 let mut writer = builder.build::<_, LineDelimited>(&mut buf);
2035 writer.write_batches(&[&batch]).unwrap();
2036 }
2037 assert_json_eq(&buf, expected);
2038 }
2039}