1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
42use arrow_schema::*;
43
44use crate::compression::CompressionCodec;
45use crate::convert::IpcSchemaEncoder;
46use crate::CONTINUATION_MARKER;
47
48#[derive(Debug, Clone)]
50pub struct IpcWriteOptions {
51 alignment: u8,
54 write_legacy_ipc_format: bool,
56 metadata_version: crate::MetadataVersion,
65 batch_compression_type: Option<crate::CompressionType>,
68 #[deprecated(
73 since = "54.0.0",
74 note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it."
75 )]
76 preserve_dict_id: bool,
77}
78
79impl IpcWriteOptions {
80 pub fn try_with_compression(
85 mut self,
86 batch_compression_type: Option<crate::CompressionType>,
87 ) -> Result<Self, ArrowError> {
88 self.batch_compression_type = batch_compression_type;
89
90 if self.batch_compression_type.is_some()
91 && self.metadata_version < crate::MetadataVersion::V5
92 {
93 return Err(ArrowError::InvalidArgumentError(
94 "Compression only supported in metadata v5 and above".to_string(),
95 ));
96 }
97 Ok(self)
98 }
99 pub fn try_new(
101 alignment: usize,
102 write_legacy_ipc_format: bool,
103 metadata_version: crate::MetadataVersion,
104 ) -> Result<Self, ArrowError> {
105 let is_alignment_valid =
106 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
107 if !is_alignment_valid {
108 return Err(ArrowError::InvalidArgumentError(
109 "Alignment should be 8, 16, 32, or 64.".to_string(),
110 ));
111 }
112 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
113 match metadata_version {
114 crate::MetadataVersion::V1
115 | crate::MetadataVersion::V2
116 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
117 "Writing IPC metadata version 3 and lower not supported".to_string(),
118 )),
119 #[allow(deprecated)]
120 crate::MetadataVersion::V4 => Ok(Self {
121 alignment,
122 write_legacy_ipc_format,
123 metadata_version,
124 batch_compression_type: None,
125 preserve_dict_id: false,
126 }),
127 crate::MetadataVersion::V5 => {
128 if write_legacy_ipc_format {
129 Err(ArrowError::InvalidArgumentError(
130 "Legacy IPC format only supported on metadata version 4".to_string(),
131 ))
132 } else {
133 #[allow(deprecated)]
134 Ok(Self {
135 alignment,
136 write_legacy_ipc_format,
137 metadata_version,
138 batch_compression_type: None,
139 preserve_dict_id: false,
140 })
141 }
142 }
143 z => Err(ArrowError::InvalidArgumentError(format!(
144 "Unsupported crate::MetadataVersion {z:?}"
145 ))),
146 }
147 }
148
149 #[deprecated(
152 since = "54.0.0",
153 note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
154 )]
155 pub fn preserve_dict_id(&self) -> bool {
156 #[allow(deprecated)]
157 self.preserve_dict_id
158 }
159
160 #[deprecated(
168 since = "54.0.0",
169 note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
170 )]
171 #[allow(deprecated)]
172 pub fn with_preserve_dict_id(mut self, preserve_dict_id: bool) -> Self {
173 self.preserve_dict_id = preserve_dict_id;
174 self
175 }
176}
177
178impl Default for IpcWriteOptions {
179 fn default() -> Self {
180 #[allow(deprecated)]
181 Self {
182 alignment: 64,
183 write_legacy_ipc_format: false,
184 metadata_version: crate::MetadataVersion::V5,
185 batch_compression_type: None,
186 preserve_dict_id: false,
187 }
188 }
189}
190
191#[derive(Debug, Default)]
192pub struct IpcDataGenerator {}
224
225impl IpcDataGenerator {
226 pub fn schema_to_bytes_with_dictionary_tracker(
232 &self,
233 schema: &Schema,
234 dictionary_tracker: &mut DictionaryTracker,
235 write_options: &IpcWriteOptions,
236 ) -> EncodedData {
237 let mut fbb = FlatBufferBuilder::new();
238 let schema = {
239 let fb = IpcSchemaEncoder::new()
240 .with_dictionary_tracker(dictionary_tracker)
241 .schema_to_fb_offset(&mut fbb, schema);
242 fb.as_union_value()
243 };
244
245 let mut message = crate::MessageBuilder::new(&mut fbb);
246 message.add_version(write_options.metadata_version);
247 message.add_header_type(crate::MessageHeader::Schema);
248 message.add_bodyLength(0);
249 message.add_header(schema);
250 let data = message.finish();
252 fbb.finish(data, None);
253
254 let data = fbb.finished_data();
255 EncodedData {
256 ipc_message: data.to_vec(),
257 arrow_data: vec![],
258 }
259 }
260
261 #[deprecated(
262 since = "54.0.0",
263 note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release."
264 )]
265 pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData {
267 let mut fbb = FlatBufferBuilder::new();
268 let schema = {
269 #[allow(deprecated)]
270 let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
272 fb.as_union_value()
273 };
274
275 let mut message = crate::MessageBuilder::new(&mut fbb);
276 message.add_version(write_options.metadata_version);
277 message.add_header_type(crate::MessageHeader::Schema);
278 message.add_bodyLength(0);
279 message.add_header(schema);
280 let data = message.finish();
282 fbb.finish(data, None);
283
284 let data = fbb.finished_data();
285 EncodedData {
286 ipc_message: data.to_vec(),
287 arrow_data: vec![],
288 }
289 }
290
291 fn _encode_dictionaries<I: Iterator<Item = i64>>(
292 &self,
293 column: &ArrayRef,
294 encoded_dictionaries: &mut Vec<EncodedData>,
295 dictionary_tracker: &mut DictionaryTracker,
296 write_options: &IpcWriteOptions,
297 dict_id: &mut I,
298 ) -> Result<(), ArrowError> {
299 match column.data_type() {
300 DataType::Struct(fields) => {
301 let s = as_struct_array(column);
302 for (field, column) in fields.iter().zip(s.columns()) {
303 self.encode_dictionaries(
304 field,
305 column,
306 encoded_dictionaries,
307 dictionary_tracker,
308 write_options,
309 dict_id,
310 )?;
311 }
312 }
313 DataType::RunEndEncoded(_, values) => {
314 let data = column.to_data();
315 if data.child_data().len() != 2 {
316 return Err(ArrowError::InvalidArgumentError(format!(
317 "The run encoded array should have exactly two child arrays. Found {}",
318 data.child_data().len()
319 )));
320 }
321 let values_array = make_array(data.child_data()[1].clone());
324 self.encode_dictionaries(
325 values,
326 &values_array,
327 encoded_dictionaries,
328 dictionary_tracker,
329 write_options,
330 dict_id,
331 )?;
332 }
333 DataType::List(field) => {
334 let list = as_list_array(column);
335 self.encode_dictionaries(
336 field,
337 list.values(),
338 encoded_dictionaries,
339 dictionary_tracker,
340 write_options,
341 dict_id,
342 )?;
343 }
344 DataType::LargeList(field) => {
345 let list = as_large_list_array(column);
346 self.encode_dictionaries(
347 field,
348 list.values(),
349 encoded_dictionaries,
350 dictionary_tracker,
351 write_options,
352 dict_id,
353 )?;
354 }
355 DataType::FixedSizeList(field, _) => {
356 let list = column
357 .as_any()
358 .downcast_ref::<FixedSizeListArray>()
359 .expect("Unable to downcast to fixed size list array");
360 self.encode_dictionaries(
361 field,
362 list.values(),
363 encoded_dictionaries,
364 dictionary_tracker,
365 write_options,
366 dict_id,
367 )?;
368 }
369 DataType::Map(field, _) => {
370 let map_array = as_map_array(column);
371
372 let (keys, values) = match field.data_type() {
373 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
374 _ => panic!("Incorrect field data type {:?}", field.data_type()),
375 };
376
377 self.encode_dictionaries(
379 keys,
380 map_array.keys(),
381 encoded_dictionaries,
382 dictionary_tracker,
383 write_options,
384 dict_id,
385 )?;
386
387 self.encode_dictionaries(
389 values,
390 map_array.values(),
391 encoded_dictionaries,
392 dictionary_tracker,
393 write_options,
394 dict_id,
395 )?;
396 }
397 DataType::Union(fields, _) => {
398 let union = as_union_array(column);
399 for (type_id, field) in fields.iter() {
400 let column = union.child(type_id);
401 self.encode_dictionaries(
402 field,
403 column,
404 encoded_dictionaries,
405 dictionary_tracker,
406 write_options,
407 dict_id,
408 )?;
409 }
410 }
411 _ => (),
412 }
413
414 Ok(())
415 }
416
417 fn encode_dictionaries<I: Iterator<Item = i64>>(
418 &self,
419 field: &Field,
420 column: &ArrayRef,
421 encoded_dictionaries: &mut Vec<EncodedData>,
422 dictionary_tracker: &mut DictionaryTracker,
423 write_options: &IpcWriteOptions,
424 dict_id_seq: &mut I,
425 ) -> Result<(), ArrowError> {
426 match column.data_type() {
427 DataType::Dictionary(_key_type, _value_type) => {
428 let dict_data = column.to_data();
429 let dict_values = &dict_data.child_data()[0];
430
431 let values = make_array(dict_data.child_data()[0].clone());
432
433 self._encode_dictionaries(
434 &values,
435 encoded_dictionaries,
436 dictionary_tracker,
437 write_options,
438 dict_id_seq,
439 )?;
440
441 #[allow(deprecated)]
445 let dict_id = dict_id_seq
446 .next()
447 .or_else(|| field.dict_id())
448 .ok_or_else(|| {
449 ArrowError::IpcError(format!("no dict id for field {}", field.name()))
450 })?;
451
452 let emit = dictionary_tracker.insert(dict_id, column)?;
453
454 if emit {
455 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
456 dict_id,
457 dict_values,
458 write_options,
459 )?);
460 }
461 }
462 _ => self._encode_dictionaries(
463 column,
464 encoded_dictionaries,
465 dictionary_tracker,
466 write_options,
467 dict_id_seq,
468 )?,
469 }
470
471 Ok(())
472 }
473
474 pub fn encoded_batch(
478 &self,
479 batch: &RecordBatch,
480 dictionary_tracker: &mut DictionaryTracker,
481 write_options: &IpcWriteOptions,
482 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
483 let schema = batch.schema();
484 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
485
486 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
487
488 for (i, field) in schema.fields().iter().enumerate() {
489 let column = batch.column(i);
490 self.encode_dictionaries(
491 field,
492 column,
493 &mut encoded_dictionaries,
494 dictionary_tracker,
495 write_options,
496 &mut dict_id,
497 )?;
498 }
499
500 let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
501 Ok((encoded_dictionaries, encoded_message))
502 }
503
504 fn record_batch_to_bytes(
507 &self,
508 batch: &RecordBatch,
509 write_options: &IpcWriteOptions,
510 ) -> Result<EncodedData, ArrowError> {
511 let mut fbb = FlatBufferBuilder::new();
512
513 let mut nodes: Vec<crate::FieldNode> = vec![];
514 let mut buffers: Vec<crate::Buffer> = vec![];
515 let mut arrow_data: Vec<u8> = vec![];
516 let mut offset = 0;
517
518 let batch_compression_type = write_options.batch_compression_type;
520
521 let compression = batch_compression_type.map(|batch_compression_type| {
522 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
523 c.add_method(crate::BodyCompressionMethod::BUFFER);
524 c.add_codec(batch_compression_type);
525 c.finish()
526 });
527
528 let compression_codec: Option<CompressionCodec> =
529 batch_compression_type.map(TryInto::try_into).transpose()?;
530
531 let mut variadic_buffer_counts = vec![];
532
533 for array in batch.columns() {
534 let array_data = array.to_data();
535 offset = write_array_data(
536 &array_data,
537 &mut buffers,
538 &mut arrow_data,
539 &mut nodes,
540 offset,
541 array.len(),
542 array.null_count(),
543 compression_codec,
544 write_options,
545 )?;
546
547 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
548 }
549 let len = arrow_data.len();
551 let pad_len = pad_to_alignment(write_options.alignment, len);
552 arrow_data.extend_from_slice(&PADDING[..pad_len]);
553
554 let buffers = fbb.create_vector(&buffers);
556 let nodes = fbb.create_vector(&nodes);
557 let variadic_buffer = if variadic_buffer_counts.is_empty() {
558 None
559 } else {
560 Some(fbb.create_vector(&variadic_buffer_counts))
561 };
562
563 let root = {
564 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
565 batch_builder.add_length(batch.num_rows() as i64);
566 batch_builder.add_nodes(nodes);
567 batch_builder.add_buffers(buffers);
568 if let Some(c) = compression {
569 batch_builder.add_compression(c);
570 }
571
572 if let Some(v) = variadic_buffer {
573 batch_builder.add_variadicBufferCounts(v);
574 }
575 let b = batch_builder.finish();
576 b.as_union_value()
577 };
578 let mut message = crate::MessageBuilder::new(&mut fbb);
580 message.add_version(write_options.metadata_version);
581 message.add_header_type(crate::MessageHeader::RecordBatch);
582 message.add_bodyLength(arrow_data.len() as i64);
583 message.add_header(root);
584 let root = message.finish();
585 fbb.finish(root, None);
586 let finished_data = fbb.finished_data();
587
588 Ok(EncodedData {
589 ipc_message: finished_data.to_vec(),
590 arrow_data,
591 })
592 }
593
594 fn dictionary_batch_to_bytes(
597 &self,
598 dict_id: i64,
599 array_data: &ArrayData,
600 write_options: &IpcWriteOptions,
601 ) -> Result<EncodedData, ArrowError> {
602 let mut fbb = FlatBufferBuilder::new();
603
604 let mut nodes: Vec<crate::FieldNode> = vec![];
605 let mut buffers: Vec<crate::Buffer> = vec![];
606 let mut arrow_data: Vec<u8> = vec![];
607
608 let batch_compression_type = write_options.batch_compression_type;
610
611 let compression = batch_compression_type.map(|batch_compression_type| {
612 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
613 c.add_method(crate::BodyCompressionMethod::BUFFER);
614 c.add_codec(batch_compression_type);
615 c.finish()
616 });
617
618 let compression_codec: Option<CompressionCodec> = batch_compression_type
619 .map(|batch_compression_type| batch_compression_type.try_into())
620 .transpose()?;
621
622 write_array_data(
623 array_data,
624 &mut buffers,
625 &mut arrow_data,
626 &mut nodes,
627 0,
628 array_data.len(),
629 array_data.null_count(),
630 compression_codec,
631 write_options,
632 )?;
633
634 let mut variadic_buffer_counts = vec![];
635 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
636
637 let len = arrow_data.len();
639 let pad_len = pad_to_alignment(write_options.alignment, len);
640 arrow_data.extend_from_slice(&PADDING[..pad_len]);
641
642 let buffers = fbb.create_vector(&buffers);
644 let nodes = fbb.create_vector(&nodes);
645 let variadic_buffer = if variadic_buffer_counts.is_empty() {
646 None
647 } else {
648 Some(fbb.create_vector(&variadic_buffer_counts))
649 };
650
651 let root = {
652 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
653 batch_builder.add_length(array_data.len() as i64);
654 batch_builder.add_nodes(nodes);
655 batch_builder.add_buffers(buffers);
656 if let Some(c) = compression {
657 batch_builder.add_compression(c);
658 }
659 if let Some(v) = variadic_buffer {
660 batch_builder.add_variadicBufferCounts(v);
661 }
662 batch_builder.finish()
663 };
664
665 let root = {
666 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
667 batch_builder.add_id(dict_id);
668 batch_builder.add_data(root);
669 batch_builder.finish().as_union_value()
670 };
671
672 let root = {
673 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
674 message_builder.add_version(write_options.metadata_version);
675 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
676 message_builder.add_bodyLength(arrow_data.len() as i64);
677 message_builder.add_header(root);
678 message_builder.finish()
679 };
680
681 fbb.finish(root, None);
682 let finished_data = fbb.finished_data();
683
684 Ok(EncodedData {
685 ipc_message: finished_data.to_vec(),
686 arrow_data,
687 })
688 }
689}
690
691fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
692 match array.data_type() {
693 DataType::BinaryView | DataType::Utf8View => {
694 counts.push(array.buffers().len() as i64 - 1);
697 }
698 DataType::Dictionary(_, _) => {
699 }
702 _ => {
703 for child in array.child_data() {
704 append_variadic_buffer_counts(counts, child)
705 }
706 }
707 }
708}
709
710pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
711 match arr.data_type() {
712 DataType::RunEndEncoded(k, _) => match k.data_type() {
713 DataType::Int16 => {
714 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
715 }
716 DataType::Int32 => {
717 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
718 }
719 DataType::Int64 => {
720 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
721 }
722 d => unreachable!("Unexpected data type {d}"),
723 },
724 d => Err(ArrowError::InvalidArgumentError(format!(
725 "The given array is not a run array. Data type of given array: {d}"
726 ))),
727 }
728}
729
730fn into_zero_offset_run_array<R: RunEndIndexType>(
733 run_array: RunArray<R>,
734) -> Result<RunArray<R>, ArrowError> {
735 let run_ends = run_array.run_ends();
736 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
737 return Ok(run_array);
738 }
739
740 let start_physical_index = run_ends.get_start_physical_index();
742
743 let end_physical_index = run_ends.get_end_physical_index();
745
746 let physical_length = end_physical_index - start_physical_index + 1;
747
748 let offset = R::Native::usize_as(run_ends.offset());
750 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
751 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
752 builder.append(run_end_value.sub_wrapping(offset));
753 }
754 builder.append(R::Native::from_usize(run_array.len()).unwrap());
755 let new_run_ends = unsafe {
756 ArrayDataBuilder::new(R::DATA_TYPE)
759 .len(physical_length)
760 .add_buffer(builder.finish())
761 .build_unchecked()
762 };
763
764 let new_values = run_array
766 .values()
767 .slice(start_physical_index, physical_length)
768 .into_data();
769
770 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
771 .len(run_array.len())
772 .add_child_data(new_run_ends)
773 .add_child_data(new_values);
774 let array_data = unsafe {
775 builder.build_unchecked()
778 };
779 Ok(array_data.into())
780}
781
782#[derive(Debug)]
788pub struct DictionaryTracker {
789 written: HashMap<i64, ArrayData>,
790 dict_ids: Vec<i64>,
791 error_on_replacement: bool,
792 #[deprecated(
793 since = "54.0.0",
794 note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it."
795 )]
796 preserve_dict_id: bool,
797}
798
799impl DictionaryTracker {
800 pub fn new(error_on_replacement: bool) -> Self {
811 #[allow(deprecated)]
812 Self {
813 written: HashMap::new(),
814 dict_ids: Vec::new(),
815 error_on_replacement,
816 preserve_dict_id: false,
817 }
818 }
819
820 #[deprecated(
826 since = "54.0.0",
827 note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
828 )]
829 pub fn new_with_preserve_dict_id(error_on_replacement: bool, preserve_dict_id: bool) -> Self {
830 #[allow(deprecated)]
831 Self {
832 written: HashMap::new(),
833 dict_ids: Vec::new(),
834 error_on_replacement,
835 preserve_dict_id,
836 }
837 }
838
839 #[deprecated(
847 since = "54.0.0",
848 note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
849 )]
850 pub fn set_dict_id(&mut self, field: &Field) -> i64 {
851 #[allow(deprecated)]
852 let next = if self.preserve_dict_id {
853 #[allow(deprecated)]
854 field.dict_id().expect("no dict_id in field")
855 } else {
856 self.dict_ids
857 .last()
858 .copied()
859 .map(|i| i + 1)
860 .unwrap_or_default()
861 };
862
863 self.dict_ids.push(next);
864 next
865 }
866
867 pub fn dict_id(&mut self) -> &[i64] {
870 &self.dict_ids
871 }
872
873 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
883 let dict_data = column.to_data();
884 let dict_values = &dict_data.child_data()[0];
885
886 if let Some(last) = self.written.get(&dict_id) {
888 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
889 return Ok(false);
891 }
892 if self.error_on_replacement {
893 if last.child_data()[0] == *dict_values {
895 return Ok(false);
897 }
898 return Err(ArrowError::InvalidArgumentError(
899 "Dictionary replacement detected when writing IPC file format. \
900 Arrow IPC files only support a single dictionary for a given field \
901 across all batches."
902 .to_string(),
903 ));
904 }
905 }
906
907 self.written.insert(dict_id, dict_data);
908 Ok(true)
909 }
910}
911
912pub struct FileWriter<W> {
935 writer: W,
937 write_options: IpcWriteOptions,
939 schema: SchemaRef,
941 block_offsets: usize,
943 dictionary_blocks: Vec<crate::Block>,
945 record_blocks: Vec<crate::Block>,
947 finished: bool,
949 dictionary_tracker: DictionaryTracker,
951 custom_metadata: HashMap<String, String>,
953
954 data_gen: IpcDataGenerator,
955}
956
957impl<W: Write> FileWriter<BufWriter<W>> {
958 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
962 Self::try_new(BufWriter::new(writer), schema)
963 }
964}
965
966impl<W: Write> FileWriter<W> {
967 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
975 let write_options = IpcWriteOptions::default();
976 Self::try_new_with_options(writer, schema, write_options)
977 }
978
979 pub fn try_new_with_options(
987 mut writer: W,
988 schema: &Schema,
989 write_options: IpcWriteOptions,
990 ) -> Result<Self, ArrowError> {
991 let data_gen = IpcDataGenerator::default();
992 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
994 let header_size = super::ARROW_MAGIC.len() + pad_len;
995 writer.write_all(&super::ARROW_MAGIC)?;
996 writer.write_all(&PADDING[..pad_len])?;
997 #[allow(deprecated)]
999 let preserve_dict_id = write_options.preserve_dict_id;
1000 #[allow(deprecated)]
1001 let mut dictionary_tracker =
1002 DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1003 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1004 schema,
1005 &mut dictionary_tracker,
1006 &write_options,
1007 );
1008 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1009 Ok(Self {
1010 writer,
1011 write_options,
1012 schema: Arc::new(schema.clone()),
1013 block_offsets: meta + data + header_size,
1014 dictionary_blocks: vec![],
1015 record_blocks: vec![],
1016 finished: false,
1017 dictionary_tracker,
1018 custom_metadata: HashMap::new(),
1019 data_gen,
1020 })
1021 }
1022
1023 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1025 self.custom_metadata.insert(key.into(), value.into());
1026 }
1027
1028 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1030 if self.finished {
1031 return Err(ArrowError::IpcError(
1032 "Cannot write record batch to file writer as it is closed".to_string(),
1033 ));
1034 }
1035
1036 let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
1037 batch,
1038 &mut self.dictionary_tracker,
1039 &self.write_options,
1040 )?;
1041
1042 for encoded_dictionary in encoded_dictionaries {
1043 let (meta, data) =
1044 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1045
1046 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1047 self.dictionary_blocks.push(block);
1048 self.block_offsets += meta + data;
1049 }
1050
1051 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1052 let block = crate::Block::new(
1054 self.block_offsets as i64,
1055 meta as i32, data as i64,
1057 );
1058 self.record_blocks.push(block);
1059 self.block_offsets += meta + data;
1060 Ok(())
1061 }
1062
1063 pub fn finish(&mut self) -> Result<(), ArrowError> {
1065 if self.finished {
1066 return Err(ArrowError::IpcError(
1067 "Cannot write footer to file writer as it is closed".to_string(),
1068 ));
1069 }
1070
1071 write_continuation(&mut self.writer, &self.write_options, 0)?;
1073
1074 let mut fbb = FlatBufferBuilder::new();
1075 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1076 let record_batches = fbb.create_vector(&self.record_blocks);
1077 #[allow(deprecated)]
1078 let preserve_dict_id = self.write_options.preserve_dict_id;
1079 #[allow(deprecated)]
1080 let mut dictionary_tracker =
1081 DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1082 let schema = IpcSchemaEncoder::new()
1083 .with_dictionary_tracker(&mut dictionary_tracker)
1084 .schema_to_fb_offset(&mut fbb, &self.schema);
1085 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1086 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1087
1088 let root = {
1089 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1090 footer_builder.add_version(self.write_options.metadata_version);
1091 footer_builder.add_schema(schema);
1092 footer_builder.add_dictionaries(dictionaries);
1093 footer_builder.add_recordBatches(record_batches);
1094 if let Some(fb_custom_metadata) = fb_custom_metadata {
1095 footer_builder.add_custom_metadata(fb_custom_metadata);
1096 }
1097 footer_builder.finish()
1098 };
1099 fbb.finish(root, None);
1100 let footer_data = fbb.finished_data();
1101 self.writer.write_all(footer_data)?;
1102 self.writer
1103 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1104 self.writer.write_all(&super::ARROW_MAGIC)?;
1105 self.writer.flush()?;
1106 self.finished = true;
1107
1108 Ok(())
1109 }
1110
1111 pub fn schema(&self) -> &SchemaRef {
1113 &self.schema
1114 }
1115
1116 pub fn get_ref(&self) -> &W {
1118 &self.writer
1119 }
1120
1121 pub fn get_mut(&mut self) -> &mut W {
1125 &mut self.writer
1126 }
1127
1128 pub fn flush(&mut self) -> Result<(), ArrowError> {
1132 self.writer.flush()?;
1133 Ok(())
1134 }
1135
1136 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1145 if !self.finished {
1146 self.finish()?;
1148 }
1149 Ok(self.writer)
1150 }
1151}
1152
1153impl<W: Write> RecordBatchWriter for FileWriter<W> {
1154 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1155 self.write(batch)
1156 }
1157
1158 fn close(mut self) -> Result<(), ArrowError> {
1159 self.finish()
1160 }
1161}
1162
1163pub struct StreamWriter<W> {
1187 writer: W,
1189 write_options: IpcWriteOptions,
1191 finished: bool,
1193 dictionary_tracker: DictionaryTracker,
1195
1196 data_gen: IpcDataGenerator,
1197}
1198
1199impl<W: Write> StreamWriter<BufWriter<W>> {
1200 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1204 Self::try_new(BufWriter::new(writer), schema)
1205 }
1206}
1207
1208impl<W: Write> StreamWriter<W> {
1209 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1217 let write_options = IpcWriteOptions::default();
1218 Self::try_new_with_options(writer, schema, write_options)
1219 }
1220
1221 pub fn try_new_with_options(
1227 mut writer: W,
1228 schema: &Schema,
1229 write_options: IpcWriteOptions,
1230 ) -> Result<Self, ArrowError> {
1231 let data_gen = IpcDataGenerator::default();
1232 #[allow(deprecated)]
1233 let preserve_dict_id = write_options.preserve_dict_id;
1234 #[allow(deprecated)]
1235 let mut dictionary_tracker =
1236 DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id);
1237
1238 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1240 schema,
1241 &mut dictionary_tracker,
1242 &write_options,
1243 );
1244 write_message(&mut writer, encoded_message, &write_options)?;
1245 Ok(Self {
1246 writer,
1247 write_options,
1248 finished: false,
1249 dictionary_tracker,
1250 data_gen,
1251 })
1252 }
1253
1254 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1256 if self.finished {
1257 return Err(ArrowError::IpcError(
1258 "Cannot write record batch to stream writer as it is closed".to_string(),
1259 ));
1260 }
1261
1262 let (encoded_dictionaries, encoded_message) = self
1263 .data_gen
1264 .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1265 .expect("StreamWriter is configured to not error on dictionary replacement");
1266
1267 for encoded_dictionary in encoded_dictionaries {
1268 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1269 }
1270
1271 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1272 Ok(())
1273 }
1274
1275 pub fn finish(&mut self) -> Result<(), ArrowError> {
1277 if self.finished {
1278 return Err(ArrowError::IpcError(
1279 "Cannot write footer to stream writer as it is closed".to_string(),
1280 ));
1281 }
1282
1283 write_continuation(&mut self.writer, &self.write_options, 0)?;
1284
1285 self.finished = true;
1286
1287 Ok(())
1288 }
1289
1290 pub fn get_ref(&self) -> &W {
1292 &self.writer
1293 }
1294
1295 pub fn get_mut(&mut self) -> &mut W {
1299 &mut self.writer
1300 }
1301
1302 pub fn flush(&mut self) -> Result<(), ArrowError> {
1306 self.writer.flush()?;
1307 Ok(())
1308 }
1309
1310 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1348 if !self.finished {
1349 self.finish()?;
1351 }
1352 Ok(self.writer)
1353 }
1354}
1355
1356impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1357 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1358 self.write(batch)
1359 }
1360
1361 fn close(mut self) -> Result<(), ArrowError> {
1362 self.finish()
1363 }
1364}
1365
1366pub struct EncodedData {
1368 pub ipc_message: Vec<u8>,
1370 pub arrow_data: Vec<u8>,
1372}
1373pub fn write_message<W: Write>(
1375 mut writer: W,
1376 encoded: EncodedData,
1377 write_options: &IpcWriteOptions,
1378) -> Result<(usize, usize), ArrowError> {
1379 let arrow_data_len = encoded.arrow_data.len();
1380 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1381 return Err(ArrowError::MemoryError(
1382 "Arrow data not aligned".to_string(),
1383 ));
1384 }
1385
1386 let a = usize::from(write_options.alignment - 1);
1387 let buffer = encoded.ipc_message;
1388 let flatbuf_size = buffer.len();
1389 let prefix_size = if write_options.write_legacy_ipc_format {
1390 4
1391 } else {
1392 8
1393 };
1394 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1395 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1396
1397 write_continuation(
1398 &mut writer,
1399 write_options,
1400 (aligned_size - prefix_size) as i32,
1401 )?;
1402
1403 if flatbuf_size > 0 {
1405 writer.write_all(&buffer)?;
1406 }
1407 writer.write_all(&PADDING[..padding_bytes])?;
1409
1410 let body_len = if arrow_data_len > 0 {
1412 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1413 } else {
1414 0
1415 };
1416
1417 Ok((aligned_size, body_len))
1418}
1419
1420fn write_body_buffers<W: Write>(
1421 mut writer: W,
1422 data: &[u8],
1423 alignment: u8,
1424) -> Result<usize, ArrowError> {
1425 let len = data.len();
1426 let pad_len = pad_to_alignment(alignment, len);
1427 let total_len = len + pad_len;
1428
1429 writer.write_all(data)?;
1431 if pad_len > 0 {
1432 writer.write_all(&PADDING[..pad_len])?;
1433 }
1434
1435 writer.flush()?;
1436 Ok(total_len)
1437}
1438
1439fn write_continuation<W: Write>(
1442 mut writer: W,
1443 write_options: &IpcWriteOptions,
1444 total_len: i32,
1445) -> Result<usize, ArrowError> {
1446 let mut written = 8;
1447
1448 match write_options.metadata_version {
1450 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1451 unreachable!("Options with the metadata version cannot be created")
1452 }
1453 crate::MetadataVersion::V4 => {
1454 if !write_options.write_legacy_ipc_format {
1455 writer.write_all(&CONTINUATION_MARKER)?;
1457 written = 4;
1458 }
1459 writer.write_all(&total_len.to_le_bytes()[..])?;
1460 }
1461 crate::MetadataVersion::V5 => {
1462 writer.write_all(&CONTINUATION_MARKER)?;
1464 writer.write_all(&total_len.to_le_bytes()[..])?;
1465 }
1466 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1467 };
1468
1469 writer.flush()?;
1470
1471 Ok(written)
1472}
1473
1474fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1478 if write_options.metadata_version < crate::MetadataVersion::V5 {
1479 !matches!(data_type, DataType::Null)
1480 } else {
1481 !matches!(
1482 data_type,
1483 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1484 )
1485 }
1486}
1487
1488#[inline]
1490fn buffer_need_truncate(
1491 array_offset: usize,
1492 buffer: &Buffer,
1493 spec: &BufferSpec,
1494 min_length: usize,
1495) -> bool {
1496 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1497}
1498
1499#[inline]
1501fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1502 match spec {
1503 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1504 _ => 0,
1505 }
1506}
1507
1508fn reencode_offsets<O: OffsetSizeTrait>(
1511 offsets: &Buffer,
1512 data: &ArrayData,
1513) -> (Buffer, usize, usize) {
1514 let offsets_slice: &[O] = offsets.typed_data::<O>();
1515 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1516
1517 let start_offset = offset_slice.first().unwrap();
1518 let end_offset = offset_slice.last().unwrap();
1519
1520 let offsets = match start_offset.as_usize() {
1521 0 => {
1522 let size = size_of::<O>();
1523 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1524 }
1525 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1526 };
1527
1528 let start_offset = start_offset.as_usize();
1529 let end_offset = end_offset.as_usize();
1530
1531 (offsets, start_offset, end_offset - start_offset)
1532}
1533
1534fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1540 if data.is_empty() {
1541 return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1542 }
1543
1544 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1545 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1546 (offsets, values)
1547}
1548
1549fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1552 if data.is_empty() {
1553 return (
1554 MutableBuffer::new(0).into(),
1555 data.child_data()[0].slice(0, 0),
1556 );
1557 }
1558
1559 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1560 let child_data = data.child_data()[0].slice(original_start_offset, len);
1561 (offsets, child_data)
1562}
1563
1564#[allow(clippy::too_many_arguments)]
1566fn write_array_data(
1567 array_data: &ArrayData,
1568 buffers: &mut Vec<crate::Buffer>,
1569 arrow_data: &mut Vec<u8>,
1570 nodes: &mut Vec<crate::FieldNode>,
1571 offset: i64,
1572 num_rows: usize,
1573 null_count: usize,
1574 compression_codec: Option<CompressionCodec>,
1575 write_options: &IpcWriteOptions,
1576) -> Result<i64, ArrowError> {
1577 let mut offset = offset;
1578 if !matches!(array_data.data_type(), DataType::Null) {
1579 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1580 } else {
1581 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1584 }
1585 if has_validity_bitmap(array_data.data_type(), write_options) {
1586 let null_buffer = match array_data.nulls() {
1588 None => {
1589 let num_bytes = bit_util::ceil(num_rows, 8);
1591 let buffer = MutableBuffer::new(num_bytes);
1592 let buffer = buffer.with_bitset(num_bytes, true);
1593 buffer.into()
1594 }
1595 Some(buffer) => buffer.inner().sliced(),
1596 };
1597
1598 offset = write_buffer(
1599 null_buffer.as_slice(),
1600 buffers,
1601 arrow_data,
1602 offset,
1603 compression_codec,
1604 write_options.alignment,
1605 )?;
1606 }
1607
1608 let data_type = array_data.data_type();
1609 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1610 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1611 for buffer in [offsets, values] {
1612 offset = write_buffer(
1613 buffer.as_slice(),
1614 buffers,
1615 arrow_data,
1616 offset,
1617 compression_codec,
1618 write_options.alignment,
1619 )?;
1620 }
1621 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1622 for buffer in array_data.buffers() {
1629 offset = write_buffer(
1630 buffer.as_slice(),
1631 buffers,
1632 arrow_data,
1633 offset,
1634 compression_codec,
1635 write_options.alignment,
1636 )?;
1637 }
1638 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1639 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1640 for buffer in [offsets, values] {
1641 offset = write_buffer(
1642 buffer.as_slice(),
1643 buffers,
1644 arrow_data,
1645 offset,
1646 compression_codec,
1647 write_options.alignment,
1648 )?;
1649 }
1650 } else if DataType::is_numeric(data_type)
1651 || DataType::is_temporal(data_type)
1652 || matches!(
1653 array_data.data_type(),
1654 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1655 )
1656 {
1657 assert_eq!(array_data.buffers().len(), 1);
1659
1660 let buffer = &array_data.buffers()[0];
1661 let layout = layout(data_type);
1662 let spec = &layout.buffers[0];
1663
1664 let byte_width = get_buffer_element_width(spec);
1665 let min_length = array_data.len() * byte_width;
1666 let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1667 let byte_offset = array_data.offset() * byte_width;
1668 let buffer_length = min(min_length, buffer.len() - byte_offset);
1669 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1670 } else {
1671 buffer.as_slice()
1672 };
1673 offset = write_buffer(
1674 buffer_slice,
1675 buffers,
1676 arrow_data,
1677 offset,
1678 compression_codec,
1679 write_options.alignment,
1680 )?;
1681 } else if matches!(data_type, DataType::Boolean) {
1682 assert_eq!(array_data.buffers().len(), 1);
1685
1686 let buffer = &array_data.buffers()[0];
1687 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1688 offset = write_buffer(
1689 &buffer,
1690 buffers,
1691 arrow_data,
1692 offset,
1693 compression_codec,
1694 write_options.alignment,
1695 )?;
1696 } else if matches!(
1697 data_type,
1698 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1699 ) {
1700 assert_eq!(array_data.buffers().len(), 1);
1701 assert_eq!(array_data.child_data().len(), 1);
1702
1703 let (offsets, sliced_child_data) = match data_type {
1705 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1706 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1707 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1708 _ => unreachable!(),
1709 };
1710 offset = write_buffer(
1711 offsets.as_slice(),
1712 buffers,
1713 arrow_data,
1714 offset,
1715 compression_codec,
1716 write_options.alignment,
1717 )?;
1718 offset = write_array_data(
1719 &sliced_child_data,
1720 buffers,
1721 arrow_data,
1722 nodes,
1723 offset,
1724 sliced_child_data.len(),
1725 sliced_child_data.null_count(),
1726 compression_codec,
1727 write_options,
1728 )?;
1729 return Ok(offset);
1730 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1731 assert_eq!(array_data.child_data().len(), 1);
1732 let fixed_size = *fixed_size as usize;
1733
1734 let child_offset = array_data.offset() * fixed_size;
1735 let child_length = array_data.len() * fixed_size;
1736 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1737
1738 offset = write_array_data(
1739 &child_data,
1740 buffers,
1741 arrow_data,
1742 nodes,
1743 offset,
1744 child_data.len(),
1745 child_data.null_count(),
1746 compression_codec,
1747 write_options,
1748 )?;
1749 return Ok(offset);
1750 } else {
1751 for buffer in array_data.buffers() {
1752 offset = write_buffer(
1753 buffer,
1754 buffers,
1755 arrow_data,
1756 offset,
1757 compression_codec,
1758 write_options.alignment,
1759 )?;
1760 }
1761 }
1762
1763 match array_data.data_type() {
1764 DataType::Dictionary(_, _) => {}
1765 DataType::RunEndEncoded(_, _) => {
1766 let arr = unslice_run_array(array_data.clone())?;
1768 for data_ref in arr.child_data() {
1770 offset = write_array_data(
1772 data_ref,
1773 buffers,
1774 arrow_data,
1775 nodes,
1776 offset,
1777 data_ref.len(),
1778 data_ref.null_count(),
1779 compression_codec,
1780 write_options,
1781 )?;
1782 }
1783 }
1784 _ => {
1785 for data_ref in array_data.child_data() {
1787 offset = write_array_data(
1789 data_ref,
1790 buffers,
1791 arrow_data,
1792 nodes,
1793 offset,
1794 data_ref.len(),
1795 data_ref.null_count(),
1796 compression_codec,
1797 write_options,
1798 )?;
1799 }
1800 }
1801 }
1802 Ok(offset)
1803}
1804
1805fn write_buffer(
1818 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
1823 alignment: u8,
1824) -> Result<i64, ArrowError> {
1825 let len: i64 = match compression_codec {
1826 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1827 None => {
1828 arrow_data.extend_from_slice(buffer);
1829 buffer.len()
1830 }
1831 }
1832 .try_into()
1833 .map_err(|e| {
1834 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1835 })?;
1836
1837 buffers.push(crate::Buffer::new(offset, len));
1839 let pad_len = pad_to_alignment(alignment, len as usize);
1841 arrow_data.extend_from_slice(&PADDING[..pad_len]);
1842
1843 Ok(offset + len + (pad_len as i64))
1844}
1845
1846const PADDING: [u8; 64] = [0; 64];
1847
1848#[inline]
1850fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1851 let a = usize::from(alignment - 1);
1852 ((len + a) & !a) - len
1853}
1854
1855#[cfg(test)]
1856mod tests {
1857 use std::io::Cursor;
1858 use std::io::Seek;
1859
1860 use arrow_array::builder::FixedSizeListBuilder;
1861 use arrow_array::builder::Float32Builder;
1862 use arrow_array::builder::Int64Builder;
1863 use arrow_array::builder::MapBuilder;
1864 use arrow_array::builder::UnionBuilder;
1865 use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
1866 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1867 use arrow_array::types::*;
1868 use arrow_buffer::ScalarBuffer;
1869
1870 use crate::convert::fb_to_schema;
1871 use crate::reader::*;
1872 use crate::root_as_footer;
1873 use crate::MetadataVersion;
1874
1875 use super::*;
1876
1877 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1878 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1879 writer.write(rb).unwrap();
1880 writer.finish().unwrap();
1881 writer.into_inner().unwrap()
1882 }
1883
1884 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1885 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1886 reader.next().unwrap().unwrap()
1887 }
1888
1889 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1890 const IPC_ALIGNMENT: usize = 8;
1894
1895 let mut stream_writer = StreamWriter::try_new_with_options(
1896 vec![],
1897 record.schema_ref(),
1898 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1899 )
1900 .unwrap();
1901 stream_writer.write(record).unwrap();
1902 stream_writer.finish().unwrap();
1903 stream_writer.into_inner().unwrap()
1904 }
1905
1906 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
1907 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
1908 stream_reader.next().unwrap().unwrap()
1909 }
1910
1911 #[test]
1912 #[cfg(feature = "lz4")]
1913 fn test_write_empty_record_batch_lz4_compression() {
1914 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1915 let values: Vec<Option<i32>> = vec![];
1916 let array = Int32Array::from(values);
1917 let record_batch =
1918 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1919
1920 let mut file = tempfile::tempfile().unwrap();
1921
1922 {
1923 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1924 .unwrap()
1925 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1926 .unwrap();
1927
1928 let mut writer =
1929 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1930 writer.write(&record_batch).unwrap();
1931 writer.finish().unwrap();
1932 }
1933 file.rewind().unwrap();
1934 {
1935 let reader = FileReader::try_new(file, None).unwrap();
1937 for read_batch in reader {
1938 read_batch
1939 .unwrap()
1940 .columns()
1941 .iter()
1942 .zip(record_batch.columns())
1943 .for_each(|(a, b)| {
1944 assert_eq!(a.data_type(), b.data_type());
1945 assert_eq!(a.len(), b.len());
1946 assert_eq!(a.null_count(), b.null_count());
1947 });
1948 }
1949 }
1950 }
1951
1952 #[test]
1953 #[cfg(feature = "lz4")]
1954 fn test_write_file_with_lz4_compression() {
1955 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1956 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1957 let array = Int32Array::from(values);
1958 let record_batch =
1959 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1960
1961 let mut file = tempfile::tempfile().unwrap();
1962 {
1963 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1964 .unwrap()
1965 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1966 .unwrap();
1967
1968 let mut writer =
1969 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1970 writer.write(&record_batch).unwrap();
1971 writer.finish().unwrap();
1972 }
1973 file.rewind().unwrap();
1974 {
1975 let reader = FileReader::try_new(file, None).unwrap();
1977 for read_batch in reader {
1978 read_batch
1979 .unwrap()
1980 .columns()
1981 .iter()
1982 .zip(record_batch.columns())
1983 .for_each(|(a, b)| {
1984 assert_eq!(a.data_type(), b.data_type());
1985 assert_eq!(a.len(), b.len());
1986 assert_eq!(a.null_count(), b.null_count());
1987 });
1988 }
1989 }
1990 }
1991
1992 #[test]
1993 #[cfg(feature = "zstd")]
1994 fn test_write_file_with_zstd_compression() {
1995 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1996 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1997 let array = Int32Array::from(values);
1998 let record_batch =
1999 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2000 let mut file = tempfile::tempfile().unwrap();
2001 {
2002 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2003 .unwrap()
2004 .try_with_compression(Some(crate::CompressionType::ZSTD))
2005 .unwrap();
2006
2007 let mut writer =
2008 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2009 writer.write(&record_batch).unwrap();
2010 writer.finish().unwrap();
2011 }
2012 file.rewind().unwrap();
2013 {
2014 let reader = FileReader::try_new(file, None).unwrap();
2016 for read_batch in reader {
2017 read_batch
2018 .unwrap()
2019 .columns()
2020 .iter()
2021 .zip(record_batch.columns())
2022 .for_each(|(a, b)| {
2023 assert_eq!(a.data_type(), b.data_type());
2024 assert_eq!(a.len(), b.len());
2025 assert_eq!(a.null_count(), b.null_count());
2026 });
2027 }
2028 }
2029 }
2030
2031 #[test]
2032 fn test_write_file() {
2033 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2034 let values: Vec<Option<u32>> = vec![
2035 Some(999),
2036 None,
2037 Some(235),
2038 Some(123),
2039 None,
2040 None,
2041 None,
2042 None,
2043 None,
2044 ];
2045 let array1 = UInt32Array::from(values);
2046 let batch =
2047 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2048 .unwrap();
2049 let mut file = tempfile::tempfile().unwrap();
2050 {
2051 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2052
2053 writer.write(&batch).unwrap();
2054 writer.finish().unwrap();
2055 }
2056 file.rewind().unwrap();
2057
2058 {
2059 let mut reader = FileReader::try_new(file, None).unwrap();
2060 while let Some(Ok(read_batch)) = reader.next() {
2061 read_batch
2062 .columns()
2063 .iter()
2064 .zip(batch.columns())
2065 .for_each(|(a, b)| {
2066 assert_eq!(a.data_type(), b.data_type());
2067 assert_eq!(a.len(), b.len());
2068 assert_eq!(a.null_count(), b.null_count());
2069 });
2070 }
2071 }
2072 }
2073
2074 fn write_null_file(options: IpcWriteOptions) {
2075 let schema = Schema::new(vec![
2076 Field::new("nulls", DataType::Null, true),
2077 Field::new("int32s", DataType::Int32, false),
2078 Field::new("nulls2", DataType::Null, true),
2079 Field::new("f64s", DataType::Float64, false),
2080 ]);
2081 let array1 = NullArray::new(32);
2082 let array2 = Int32Array::from(vec![1; 32]);
2083 let array3 = NullArray::new(32);
2084 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2085 let batch = RecordBatch::try_new(
2086 Arc::new(schema.clone()),
2087 vec![
2088 Arc::new(array1) as ArrayRef,
2089 Arc::new(array2) as ArrayRef,
2090 Arc::new(array3) as ArrayRef,
2091 Arc::new(array4) as ArrayRef,
2092 ],
2093 )
2094 .unwrap();
2095 let mut file = tempfile::tempfile().unwrap();
2096 {
2097 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2098
2099 writer.write(&batch).unwrap();
2100 writer.finish().unwrap();
2101 }
2102
2103 file.rewind().unwrap();
2104
2105 {
2106 let reader = FileReader::try_new(file, None).unwrap();
2107 reader.for_each(|maybe_batch| {
2108 maybe_batch
2109 .unwrap()
2110 .columns()
2111 .iter()
2112 .zip(batch.columns())
2113 .for_each(|(a, b)| {
2114 assert_eq!(a.data_type(), b.data_type());
2115 assert_eq!(a.len(), b.len());
2116 assert_eq!(a.null_count(), b.null_count());
2117 });
2118 });
2119 }
2120 }
2121 #[test]
2122 fn test_write_null_file_v4() {
2123 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2124 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2125 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2126 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2127 }
2128
2129 #[test]
2130 fn test_write_null_file_v5() {
2131 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2132 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2133 }
2134
2135 #[test]
2136 fn track_union_nested_dict() {
2137 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2138
2139 let array = Arc::new(inner) as ArrayRef;
2140
2141 #[allow(deprecated)]
2143 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false);
2144 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2145
2146 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2147 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2148
2149 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2150
2151 let schema = Arc::new(Schema::new(vec![Field::new(
2152 "union",
2153 union.data_type().clone(),
2154 false,
2155 )]));
2156
2157 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2158
2159 let gen = IpcDataGenerator {};
2160 #[allow(deprecated)]
2161 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2162 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2163 .unwrap();
2164
2165 assert!(dict_tracker.written.contains_key(&2));
2168 }
2169
2170 #[test]
2171 fn track_struct_nested_dict() {
2172 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2173
2174 let array = Arc::new(inner) as ArrayRef;
2175
2176 #[allow(deprecated)]
2178 let dctfield = Arc::new(Field::new_dict(
2179 "dict",
2180 array.data_type().clone(),
2181 false,
2182 2,
2183 false,
2184 ));
2185
2186 let s = StructArray::from(vec![(dctfield, array)]);
2187 let struct_array = Arc::new(s) as ArrayRef;
2188
2189 let schema = Arc::new(Schema::new(vec![Field::new(
2190 "struct",
2191 struct_array.data_type().clone(),
2192 false,
2193 )]));
2194
2195 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2196
2197 let gen = IpcDataGenerator {};
2198 #[allow(deprecated)]
2199 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2200 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2201 .unwrap();
2202
2203 assert!(dict_tracker.written.contains_key(&2));
2204 }
2205
2206 fn write_union_file(options: IpcWriteOptions) {
2207 let schema = Schema::new(vec![Field::new_union(
2208 "union",
2209 vec![0, 1],
2210 vec![
2211 Field::new("a", DataType::Int32, false),
2212 Field::new("c", DataType::Float64, false),
2213 ],
2214 UnionMode::Sparse,
2215 )]);
2216 let mut builder = UnionBuilder::with_capacity_sparse(5);
2217 builder.append::<Int32Type>("a", 1).unwrap();
2218 builder.append_null::<Int32Type>("a").unwrap();
2219 builder.append::<Float64Type>("c", 3.0).unwrap();
2220 builder.append_null::<Float64Type>("c").unwrap();
2221 builder.append::<Int32Type>("a", 4).unwrap();
2222 let union = builder.build().unwrap();
2223
2224 let batch =
2225 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2226 .unwrap();
2227
2228 let mut file = tempfile::tempfile().unwrap();
2229 {
2230 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2231
2232 writer.write(&batch).unwrap();
2233 writer.finish().unwrap();
2234 }
2235 file.rewind().unwrap();
2236
2237 {
2238 let reader = FileReader::try_new(file, None).unwrap();
2239 reader.for_each(|maybe_batch| {
2240 maybe_batch
2241 .unwrap()
2242 .columns()
2243 .iter()
2244 .zip(batch.columns())
2245 .for_each(|(a, b)| {
2246 assert_eq!(a.data_type(), b.data_type());
2247 assert_eq!(a.len(), b.len());
2248 assert_eq!(a.null_count(), b.null_count());
2249 });
2250 });
2251 }
2252 }
2253
2254 #[test]
2255 fn test_write_union_file_v4_v5() {
2256 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2257 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2258 }
2259
2260 #[test]
2261 fn test_write_view_types() {
2262 const LONG_TEST_STRING: &str =
2263 "This is a long string to make sure binary view array handles it";
2264 let schema = Schema::new(vec![
2265 Field::new("field1", DataType::BinaryView, true),
2266 Field::new("field2", DataType::Utf8View, true),
2267 ]);
2268 let values: Vec<Option<&[u8]>> = vec![
2269 Some(b"foo"),
2270 Some(b"bar"),
2271 Some(LONG_TEST_STRING.as_bytes()),
2272 ];
2273 let binary_array = BinaryViewArray::from_iter(values);
2274 let utf8_array =
2275 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2276 let record_batch = RecordBatch::try_new(
2277 Arc::new(schema.clone()),
2278 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2279 )
2280 .unwrap();
2281
2282 let mut file = tempfile::tempfile().unwrap();
2283 {
2284 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2285 writer.write(&record_batch).unwrap();
2286 writer.finish().unwrap();
2287 }
2288 file.rewind().unwrap();
2289 {
2290 let mut reader = FileReader::try_new(&file, None).unwrap();
2291 let read_batch = reader.next().unwrap().unwrap();
2292 read_batch
2293 .columns()
2294 .iter()
2295 .zip(record_batch.columns())
2296 .for_each(|(a, b)| {
2297 assert_eq!(a, b);
2298 });
2299 }
2300 file.rewind().unwrap();
2301 {
2302 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2303 let read_batch = reader.next().unwrap().unwrap();
2304 assert_eq!(read_batch.num_columns(), 1);
2305 let read_array = read_batch.column(0);
2306 let write_array = record_batch.column(0);
2307 assert_eq!(read_array, write_array);
2308 }
2309 }
2310
2311 #[test]
2312 fn truncate_ipc_record_batch() {
2313 fn create_batch(rows: usize) -> RecordBatch {
2314 let schema = Schema::new(vec![
2315 Field::new("a", DataType::Int32, false),
2316 Field::new("b", DataType::Utf8, false),
2317 ]);
2318
2319 let a = Int32Array::from_iter_values(0..rows as i32);
2320 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2321
2322 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2323 }
2324
2325 let big_record_batch = create_batch(65536);
2326
2327 let length = 5;
2328 let small_record_batch = create_batch(length);
2329
2330 let offset = 2;
2331 let record_batch_slice = big_record_batch.slice(offset, length);
2332 assert!(
2333 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2334 );
2335 assert_eq!(
2336 serialize_stream(&small_record_batch).len(),
2337 serialize_stream(&record_batch_slice).len()
2338 );
2339
2340 assert_eq!(
2341 deserialize_stream(serialize_stream(&record_batch_slice)),
2342 record_batch_slice
2343 );
2344 }
2345
2346 #[test]
2347 fn truncate_ipc_record_batch_with_nulls() {
2348 fn create_batch() -> RecordBatch {
2349 let schema = Schema::new(vec![
2350 Field::new("a", DataType::Int32, true),
2351 Field::new("b", DataType::Utf8, true),
2352 ]);
2353
2354 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2355 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2356
2357 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2358 }
2359
2360 let record_batch = create_batch();
2361 let record_batch_slice = record_batch.slice(1, 2);
2362 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2363
2364 assert!(
2365 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2366 );
2367
2368 assert!(deserialized_batch.column(0).is_null(0));
2369 assert!(deserialized_batch.column(0).is_valid(1));
2370 assert!(deserialized_batch.column(1).is_valid(0));
2371 assert!(deserialized_batch.column(1).is_valid(1));
2372
2373 assert_eq!(record_batch_slice, deserialized_batch);
2374 }
2375
2376 #[test]
2377 fn truncate_ipc_dictionary_array() {
2378 fn create_batch() -> RecordBatch {
2379 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2380 .into_iter()
2381 .collect();
2382 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2383
2384 let array = DictionaryArray::new(keys, Arc::new(values));
2385
2386 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2387
2388 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2389 }
2390
2391 let record_batch = create_batch();
2392 let record_batch_slice = record_batch.slice(1, 2);
2393 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2394
2395 assert!(
2396 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2397 );
2398
2399 assert!(deserialized_batch.column(0).is_valid(0));
2400 assert!(deserialized_batch.column(0).is_null(1));
2401
2402 assert_eq!(record_batch_slice, deserialized_batch);
2403 }
2404
2405 #[test]
2406 fn truncate_ipc_struct_array() {
2407 fn create_batch() -> RecordBatch {
2408 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2409 .into_iter()
2410 .collect();
2411 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2412
2413 let struct_array = StructArray::from(vec![
2414 (
2415 Arc::new(Field::new("s", DataType::Utf8, true)),
2416 Arc::new(strings) as ArrayRef,
2417 ),
2418 (
2419 Arc::new(Field::new("c", DataType::Int32, true)),
2420 Arc::new(ints) as ArrayRef,
2421 ),
2422 ]);
2423
2424 let schema = Schema::new(vec![Field::new(
2425 "struct_array",
2426 struct_array.data_type().clone(),
2427 true,
2428 )]);
2429
2430 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2431 }
2432
2433 let record_batch = create_batch();
2434 let record_batch_slice = record_batch.slice(1, 2);
2435 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2436
2437 assert!(
2438 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2439 );
2440
2441 let structs = deserialized_batch
2442 .column(0)
2443 .as_any()
2444 .downcast_ref::<StructArray>()
2445 .unwrap();
2446
2447 assert!(structs.column(0).is_null(0));
2448 assert!(structs.column(0).is_valid(1));
2449 assert!(structs.column(1).is_valid(0));
2450 assert!(structs.column(1).is_null(1));
2451 assert_eq!(record_batch_slice, deserialized_batch);
2452 }
2453
2454 #[test]
2455 fn truncate_ipc_string_array_with_all_empty_string() {
2456 fn create_batch() -> RecordBatch {
2457 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2458 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2459 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2460 }
2461
2462 let record_batch = create_batch();
2463 let record_batch_slice = record_batch.slice(0, 1);
2464 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2465
2466 assert!(
2467 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2468 );
2469 assert_eq!(record_batch_slice, deserialized_batch);
2470 }
2471
2472 #[test]
2473 fn test_stream_writer_writes_array_slice() {
2474 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2475 assert_eq!(
2476 vec![Some(1), Some(2), Some(3)],
2477 array.iter().collect::<Vec<_>>()
2478 );
2479
2480 let sliced = array.slice(1, 2);
2481 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2482
2483 let batch = RecordBatch::try_new(
2484 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2485 vec![Arc::new(sliced)],
2486 )
2487 .expect("new batch");
2488
2489 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2490 writer.write(&batch).expect("write");
2491 let outbuf = writer.into_inner().expect("inner");
2492
2493 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2494 let read_batch = reader.next().unwrap().expect("read batch");
2495
2496 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2497 assert_eq!(
2498 vec![Some(2), Some(3)],
2499 read_array.iter().collect::<Vec<_>>()
2500 );
2501 }
2502
2503 #[test]
2504 fn test_large_slice_uint32() {
2505 ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2506 if i % 2 == 0 {
2507 Some(i)
2508 } else {
2509 None
2510 }
2511 }))));
2512 }
2513
2514 #[test]
2515 fn test_large_slice_string() {
2516 let strings: Vec<_> = (0..8000)
2517 .map(|i| {
2518 if i % 2 == 0 {
2519 Some(format!("value{}", i))
2520 } else {
2521 None
2522 }
2523 })
2524 .collect();
2525
2526 ensure_roundtrip(Arc::new(StringArray::from(strings)));
2527 }
2528
2529 #[test]
2530 fn test_large_slice_string_list() {
2531 let mut ls = ListBuilder::new(StringBuilder::new());
2532
2533 let mut s = String::new();
2534 for row_number in 0..8000 {
2535 if row_number % 2 == 0 {
2536 for list_element in 0..1000 {
2537 s.clear();
2538 use std::fmt::Write;
2539 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2540 ls.values().append_value(&s);
2541 }
2542 ls.append(true)
2543 } else {
2544 ls.append(false); }
2546 }
2547
2548 ensure_roundtrip(Arc::new(ls.finish()));
2549 }
2550
2551 #[test]
2552 fn test_large_slice_string_list_of_lists() {
2553 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2557
2558 for _ in 0..4000 {
2559 ls.values().append(true);
2560 ls.append(true)
2561 }
2562
2563 let mut s = String::new();
2564 for row_number in 0..4000 {
2565 if row_number % 2 == 0 {
2566 for list_element in 0..1000 {
2567 s.clear();
2568 use std::fmt::Write;
2569 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2570 ls.values().values().append_value(&s);
2571 }
2572 ls.values().append(true);
2573 ls.append(true)
2574 } else {
2575 ls.append(false); }
2577 }
2578
2579 ensure_roundtrip(Arc::new(ls.finish()));
2580 }
2581
2582 fn ensure_roundtrip(array: ArrayRef) {
2584 let num_rows = array.len();
2585 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2586 let sliced_batch = orig_batch.slice(1, num_rows - 1);
2588
2589 let schema = orig_batch.schema();
2590 let stream_data = {
2591 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2592 writer.write(&sliced_batch).unwrap();
2593 writer.into_inner().unwrap()
2594 };
2595 let read_batch = {
2596 let projection = None;
2597 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2598 reader
2599 .next()
2600 .expect("expect no errors reading batch")
2601 .expect("expect batch")
2602 };
2603 assert_eq!(sliced_batch, read_batch);
2604
2605 let file_data = {
2606 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2607 writer.write(&sliced_batch).unwrap();
2608 writer.into_inner().unwrap().into_inner().unwrap()
2609 };
2610 let read_batch = {
2611 let projection = None;
2612 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2613 reader
2614 .next()
2615 .expect("expect no errors reading batch")
2616 .expect("expect batch")
2617 };
2618 assert_eq!(sliced_batch, read_batch);
2619
2620 }
2622
2623 #[test]
2624 fn encode_bools_slice() {
2625 assert_bool_roundtrip([true, false], 1, 1);
2627
2628 assert_bool_roundtrip(
2630 [
2631 true, false, true, true, false, false, true, true, true, false, false, false, true,
2632 true, true, true, false, false, false, false, true, true, true, true, true, false,
2633 false, false, false, false,
2634 ],
2635 13,
2636 17,
2637 );
2638
2639 assert_bool_roundtrip(
2641 [
2642 true, false, true, true, false, false, true, true, true, false, false, false,
2643 ],
2644 8,
2645 2,
2646 );
2647
2648 assert_bool_roundtrip(
2650 [
2651 true, false, true, true, false, false, true, true, true, false, false, false, true,
2652 true, true, true, true, false, false, false, false, false,
2653 ],
2654 8,
2655 8,
2656 );
2657 }
2658
2659 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2660 let val_bool_field = Field::new("val", DataType::Boolean, false);
2661
2662 let schema = Arc::new(Schema::new(vec![val_bool_field]));
2663
2664 let bools = BooleanArray::from(bools.to_vec());
2665
2666 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2667 let batch = batch.slice(offset, length);
2668
2669 let data = serialize_stream(&batch);
2670 let batch2 = deserialize_stream(data);
2671 assert_eq!(batch, batch2);
2672 }
2673
2674 #[test]
2675 fn test_run_array_unslice() {
2676 let total_len = 80;
2677 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2678 let repeats: Vec<usize> = vec![3, 4, 1, 2];
2679 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2680 for ix in 0_usize..32 {
2681 let repeat: usize = repeats[ix % repeats.len()];
2682 let val: Option<i32> = vals[ix % vals.len()];
2683 input_array.resize(input_array.len() + repeat, val);
2684 }
2685
2686 let mut builder =
2688 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2689 builder.extend(input_array.iter().copied());
2690 let run_array = builder.finish();
2691
2692 for slice_len in 1..=total_len {
2694 let sliced_run_array: RunArray<Int16Type> =
2696 run_array.slice(0, slice_len).into_data().into();
2697
2698 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2700 let typed = unsliced_run_array
2701 .downcast::<PrimitiveArray<Int32Type>>()
2702 .unwrap();
2703 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2704 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2705 assert_eq!(expected, actual);
2706
2707 let sliced_run_array: RunArray<Int16Type> = run_array
2709 .slice(total_len - slice_len, slice_len)
2710 .into_data()
2711 .into();
2712
2713 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2715 let typed = unsliced_run_array
2716 .downcast::<PrimitiveArray<Int32Type>>()
2717 .unwrap();
2718 let expected: Vec<Option<i32>> = input_array
2719 .iter()
2720 .skip(total_len - slice_len)
2721 .copied()
2722 .collect();
2723 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2724 assert_eq!(expected, actual);
2725 }
2726 }
2727
2728 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2729 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2730
2731 for i in 0..100_000 {
2732 for value in [i, i, i] {
2733 ls.values().append_value(value);
2734 }
2735 ls.append(true)
2736 }
2737
2738 ls.finish()
2739 }
2740
2741 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2742 let mut ls =
2743 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2744
2745 for _i in 0..10_000 {
2746 for j in 0..10 {
2747 for value in [j, j, j, j] {
2748 ls.values().values().append_value(value);
2749 }
2750 ls.values().append(true)
2751 }
2752 ls.append(true);
2753 }
2754
2755 ls.finish()
2756 }
2757
2758 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2759 let mut ls =
2760 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2761
2762 for _i in 0..999 {
2763 ls.values().append(true);
2764 ls.append(true);
2765 }
2766
2767 for j in 0..10 {
2768 for value in [j, j, j, j] {
2769 ls.values().values().append_value(value);
2770 }
2771 ls.values().append(true)
2772 }
2773 ls.append(true);
2774
2775 for i in 0..9_000 {
2776 for j in 0..10 {
2777 for value in [i + j, i + j, i + j, i + j] {
2778 ls.values().values().append_value(value);
2779 }
2780 ls.values().append(true)
2781 }
2782 ls.append(true);
2783 }
2784
2785 ls.finish()
2786 }
2787
2788 fn generate_map_array_data() -> MapArray {
2789 let keys_builder = UInt32Builder::new();
2790 let values_builder = UInt32Builder::new();
2791
2792 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2793
2794 for i in 0..100_000 {
2795 for _j in 0..3 {
2796 builder.keys().append_value(i);
2797 builder.values().append_value(i * 2);
2798 }
2799 builder.append(true).unwrap();
2800 }
2801
2802 builder.finish()
2803 }
2804
2805 #[test]
2806 fn reencode_offsets_when_first_offset_is_not_zero() {
2807 let original_list = generate_list_data::<i32>();
2808 let original_data = original_list.into_data();
2809 let slice_data = original_data.slice(75, 7);
2810 let (new_offsets, original_start, length) =
2811 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2812 assert_eq!(
2813 vec![0, 3, 6, 9, 12, 15, 18, 21],
2814 new_offsets.typed_data::<i32>()
2815 );
2816 assert_eq!(225, original_start);
2817 assert_eq!(21, length);
2818 }
2819
2820 #[test]
2821 fn reencode_offsets_when_first_offset_is_zero() {
2822 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2823 ls.append(true);
2825 ls.values().append_value(35);
2826 ls.values().append_value(42);
2827 ls.append(true);
2828 let original_list = ls.finish();
2829 let original_data = original_list.into_data();
2830
2831 let slice_data = original_data.slice(1, 1);
2832 let (new_offsets, original_start, length) =
2833 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2834 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2835 assert_eq!(0, original_start);
2836 assert_eq!(2, length);
2837 }
2838
2839 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2842 let in_sliced = in_batch.slice(999, 1);
2844
2845 let bytes_batch = serialize_file(&in_batch);
2846 let bytes_sliced = serialize_file(&in_sliced);
2847
2848 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2850
2851 let out_batch = deserialize_file(bytes_batch);
2853 assert_eq!(in_batch, out_batch);
2854
2855 let out_sliced = deserialize_file(bytes_sliced);
2856 assert_eq!(in_sliced, out_sliced);
2857 }
2858
2859 #[test]
2860 fn encode_lists() {
2861 let val_inner = Field::new_list_field(DataType::UInt32, true);
2862 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2863 let schema = Arc::new(Schema::new(vec![val_list_field]));
2864
2865 let values = Arc::new(generate_list_data::<i32>());
2866
2867 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2868 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2869 }
2870
2871 #[test]
2872 fn encode_empty_list() {
2873 let val_inner = Field::new_list_field(DataType::UInt32, true);
2874 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2875 let schema = Arc::new(Schema::new(vec![val_list_field]));
2876
2877 let values = Arc::new(generate_list_data::<i32>());
2878
2879 let in_batch = RecordBatch::try_new(schema, vec![values])
2880 .unwrap()
2881 .slice(999, 0);
2882 let out_batch = deserialize_file(serialize_file(&in_batch));
2883 assert_eq!(in_batch, out_batch);
2884 }
2885
2886 #[test]
2887 fn encode_large_lists() {
2888 let val_inner = Field::new_list_field(DataType::UInt32, true);
2889 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2890 let schema = Arc::new(Schema::new(vec![val_list_field]));
2891
2892 let values = Arc::new(generate_list_data::<i64>());
2893
2894 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2897 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2898 }
2899
2900 #[test]
2901 fn encode_nested_lists() {
2902 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
2903 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
2904 let list_field = Field::new("val", DataType::List(inner_list_field), true);
2905 let schema = Arc::new(Schema::new(vec![list_field]));
2906
2907 let values = Arc::new(generate_nested_list_data::<i32>());
2908
2909 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2910 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2911 }
2912
2913 #[test]
2914 fn encode_nested_lists_starting_at_zero() {
2915 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
2916 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
2917 let list_field = Field::new("val", DataType::List(inner_list_field), true);
2918 let schema = Arc::new(Schema::new(vec![list_field]));
2919
2920 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
2921
2922 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2923 roundtrip_ensure_sliced_smaller(in_batch, 1);
2924 }
2925
2926 #[test]
2927 fn encode_map_array() {
2928 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
2929 let values = Arc::new(Field::new("values", DataType::UInt32, true));
2930 let map_field = Field::new_map("map", "entries", keys, values, false, true);
2931 let schema = Arc::new(Schema::new(vec![map_field]));
2932
2933 let values = Arc::new(generate_map_array_data());
2934
2935 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2936 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2937 }
2938
2939 #[test]
2940 fn test_decimal128_alignment16_is_sufficient() {
2941 const IPC_ALIGNMENT: usize = 16;
2942
2943 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
2948 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
2951 let mut arrays = Vec::new();
2952 for i in 0..num_cols {
2953 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2954 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2955 fields.push(field);
2956 arrays.push(Arc::new(array) as Arc<dyn Array>);
2957 }
2958 let schema = Schema::new(fields);
2959 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2960
2961 let mut writer = FileWriter::try_new_with_options(
2962 Vec::new(),
2963 batch.schema_ref(),
2964 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2965 )
2966 .unwrap();
2967 writer.write(&batch).unwrap();
2968 writer.finish().unwrap();
2969
2970 let out: Vec<u8> = writer.into_inner().unwrap();
2971
2972 let buffer = Buffer::from_vec(out);
2973 let trailer_start = buffer.len() - 10;
2974 let footer_len =
2975 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2976 let footer =
2977 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2978
2979 let schema = fb_to_schema(footer.schema().unwrap());
2980
2981 let decoder =
2984 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2985
2986 let batches = footer.recordBatches().unwrap();
2987
2988 let block = batches.get(0);
2989 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2990 let data = buffer.slice_with_length(block.offset() as _, block_len);
2991
2992 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
2993
2994 assert_eq!(batch, batch2);
2995 }
2996 }
2997
2998 #[test]
2999 fn test_decimal128_alignment8_is_unaligned() {
3000 const IPC_ALIGNMENT: usize = 8;
3001
3002 let num_cols = 2;
3003 let num_rows = 1;
3004
3005 let mut fields = Vec::new();
3006 let mut arrays = Vec::new();
3007 for i in 0..num_cols {
3008 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
3009 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3010 fields.push(field);
3011 arrays.push(Arc::new(array) as Arc<dyn Array>);
3012 }
3013 let schema = Schema::new(fields);
3014 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3015
3016 let mut writer = FileWriter::try_new_with_options(
3017 Vec::new(),
3018 batch.schema_ref(),
3019 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3020 )
3021 .unwrap();
3022 writer.write(&batch).unwrap();
3023 writer.finish().unwrap();
3024
3025 let out: Vec<u8> = writer.into_inner().unwrap();
3026
3027 let buffer = Buffer::from_vec(out);
3028 let trailer_start = buffer.len() - 10;
3029 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3030 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3031
3032 let schema = fb_to_schema(footer.schema().unwrap());
3033
3034 let decoder =
3037 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3038
3039 let batches = footer.recordBatches().unwrap();
3040
3041 let block = batches.get(0);
3042 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3043 let data = buffer.slice_with_length(block.offset() as _, block_len);
3044
3045 let result = decoder.read_record_batch(block, &data);
3046
3047 let error = result.unwrap_err();
3048 assert_eq!(
3049 error.to_string(),
3050 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3051 offset from expected alignment of 16 by 8"
3052 );
3053 }
3054
3055 #[test]
3056 fn test_flush() {
3057 let num_cols = 2;
3060 let mut fields = Vec::new();
3061 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3062 for i in 0..num_cols {
3063 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
3064 fields.push(field);
3065 }
3066 let schema = Schema::new(fields);
3067 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3068 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3069 let mut stream_writer =
3070 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3071 .unwrap();
3072 let mut file_writer =
3073 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3074
3075 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3076 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3077 stream_writer.flush().unwrap();
3078 file_writer.flush().unwrap();
3079 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3080 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3081 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3082 let expected_stream_flushed_bytes = stream_out.len() - 8;
3086 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3089
3090 assert!(
3091 stream_bytes_written_on_new < stream_bytes_written_on_flush,
3092 "this test makes no sense if flush is not actually required"
3093 );
3094 assert!(
3095 file_bytes_written_on_new < file_bytes_written_on_flush,
3096 "this test makes no sense if flush is not actually required"
3097 );
3098 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3099 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3100 }
3101
3102 #[test]
3103 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3104 let l1_type =
3105 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3106 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3107
3108 let l0_builder = Float32Builder::new();
3109 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3110 "item",
3111 DataType::Float32,
3112 false,
3113 )));
3114 let mut l2_builder =
3115 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3116
3117 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3118 l2_builder.values().values().append_value(point[0]);
3119 l2_builder.values().values().append_value(point[1]);
3120 l2_builder.values().values().append_value(point[2]);
3121
3122 l2_builder.values().append(true);
3123 }
3124 l2_builder.append(true);
3125
3126 let point = [10., 11., 12.];
3127 l2_builder.values().values().append_value(point[0]);
3128 l2_builder.values().values().append_value(point[1]);
3129 l2_builder.values().values().append_value(point[2]);
3130
3131 l2_builder.values().append(true);
3132 l2_builder.append(true);
3133
3134 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3135
3136 let schema = Arc::new(Schema::new_with_metadata(
3137 vec![Field::new("points", l2_type, false)],
3138 HashMap::default(),
3139 ));
3140
3141 test_slices(&array, &schema, 0, 1)?;
3144 test_slices(&array, &schema, 0, 2)?;
3145 test_slices(&array, &schema, 1, 1)?;
3146
3147 Ok(())
3148 }
3149
3150 #[test]
3151 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3152 let l0_builder = Float32Builder::new();
3153 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3154 let mut l2_builder = ListBuilder::new(l1_builder);
3155
3156 for point in [
3157 [Some(1.0), Some(2.0), None],
3158 [Some(4.0), Some(5.0), Some(6.0)],
3159 [None, Some(8.0), Some(9.0)],
3160 ] {
3161 for p in point {
3162 match p {
3163 Some(p) => l2_builder.values().values().append_value(p),
3164 None => l2_builder.values().values().append_null(),
3165 }
3166 }
3167
3168 l2_builder.values().append(true);
3169 }
3170 l2_builder.append(true);
3171
3172 let point = [Some(10.), None, None];
3173 for p in point {
3174 match p {
3175 Some(p) => l2_builder.values().values().append_value(p),
3176 None => l2_builder.values().values().append_null(),
3177 }
3178 }
3179
3180 l2_builder.values().append(true);
3181 l2_builder.append(true);
3182
3183 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3184
3185 let schema = Arc::new(Schema::new_with_metadata(
3186 vec![Field::new(
3187 "points",
3188 DataType::List(Arc::new(Field::new(
3189 "item",
3190 DataType::FixedSizeList(
3191 Arc::new(Field::new("item", DataType::Float32, true)),
3192 3,
3193 ),
3194 true,
3195 ))),
3196 true,
3197 )],
3198 HashMap::default(),
3199 ));
3200
3201 test_slices(&array, &schema, 0, 1)?;
3204 test_slices(&array, &schema, 0, 2)?;
3205 test_slices(&array, &schema, 1, 1)?;
3206
3207 Ok(())
3208 }
3209
3210 fn test_slices(
3211 parent_array: &ArrayRef,
3212 schema: &SchemaRef,
3213 offset: usize,
3214 length: usize,
3215 ) -> Result<(), ArrowError> {
3216 let subarray = parent_array.slice(offset, length);
3217 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3218
3219 let mut bytes = Vec::new();
3220 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3221 writer.write(&original_batch)?;
3222 writer.finish()?;
3223
3224 let mut cursor = std::io::Cursor::new(bytes);
3225 let mut reader = StreamReader::try_new(&mut cursor, None)?;
3226 let returned_batch = reader.next().unwrap()?;
3227
3228 assert_eq!(original_batch, returned_batch);
3229
3230 Ok(())
3231 }
3232
3233 #[test]
3234 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3235 let int_builder = Int64Builder::new();
3236 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3237 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3238
3239 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3240 fixed_list_builder.values().append_value(point[0]);
3241 fixed_list_builder.values().append_value(point[1]);
3242 fixed_list_builder.values().append_value(point[2]);
3243
3244 fixed_list_builder.append(true);
3245 }
3246
3247 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3248
3249 let schema = Arc::new(Schema::new_with_metadata(
3250 vec![Field::new(
3251 "points",
3252 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3253 false,
3254 )],
3255 HashMap::default(),
3256 ));
3257
3258 test_slices(&array, &schema, 0, 4)?;
3261 test_slices(&array, &schema, 0, 2)?;
3262 test_slices(&array, &schema, 1, 3)?;
3263 test_slices(&array, &schema, 2, 1)?;
3264
3265 Ok(())
3266 }
3267
3268 #[test]
3269 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3270 let int_builder = Int64Builder::new();
3271 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3272
3273 for point in [
3274 [Some(1), Some(2), None],
3275 [Some(4), Some(5), Some(6)],
3276 [None, Some(8), Some(9)],
3277 [Some(10), None, None],
3278 ] {
3279 for p in point {
3280 match p {
3281 Some(p) => fixed_list_builder.values().append_value(p),
3282 None => fixed_list_builder.values().append_null(),
3283 }
3284 }
3285
3286 fixed_list_builder.append(true);
3287 }
3288
3289 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3290
3291 let schema = Arc::new(Schema::new_with_metadata(
3292 vec![Field::new(
3293 "points",
3294 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3295 true,
3296 )],
3297 HashMap::default(),
3298 ));
3299
3300 test_slices(&array, &schema, 0, 4)?;
3303 test_slices(&array, &schema, 0, 2)?;
3304 test_slices(&array, &schema, 1, 3)?;
3305 test_slices(&array, &schema, 2, 1)?;
3306
3307 Ok(())
3308 }
3309}