1mod stream;
28
29pub use stream::*;
30
31use flatbuffers::{VectorIter, VerifierOptions};
32use std::collections::{HashMap, VecDeque};
33use std::fmt;
34use std::io::{BufReader, Read, Seek, SeekFrom};
35use std::sync::Arc;
36
37use arrow_array::*;
38use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, ScalarBuffer};
39use arrow_data::ArrayData;
40use arrow_schema::*;
41
42use crate::compression::CompressionCodec;
43use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER};
44use DataType::*;
45
46fn read_buffer(
56 buf: &crate::Buffer,
57 a_data: &Buffer,
58 compression_codec: Option<CompressionCodec>,
59) -> Result<Buffer, ArrowError> {
60 let start_offset = buf.offset() as usize;
61 let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
62 match (buf_data.is_empty(), compression_codec) {
64 (true, _) | (_, None) => Ok(buf_data),
65 (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
66 }
67}
68impl RecordBatchDecoder<'_> {
69 fn create_array(
82 &mut self,
83 field: &Field,
84 variadic_counts: &mut VecDeque<i64>,
85 ) -> Result<ArrayRef, ArrowError> {
86 let data_type = field.data_type();
87 match data_type {
88 Utf8 | Binary | LargeBinary | LargeUtf8 => {
89 let field_node = self.next_node(field)?;
90 let buffers = [
91 self.next_buffer()?,
92 self.next_buffer()?,
93 self.next_buffer()?,
94 ];
95 self.create_primitive_array(field_node, data_type, &buffers)
96 }
97 BinaryView | Utf8View => {
98 let count = variadic_counts
99 .pop_front()
100 .ok_or(ArrowError::IpcError(format!(
101 "Missing variadic count for {data_type} column"
102 )))?;
103 let count = count + 2; let buffers = (0..count)
105 .map(|_| self.next_buffer())
106 .collect::<Result<Vec<_>, _>>()?;
107 let field_node = self.next_node(field)?;
108 self.create_primitive_array(field_node, data_type, &buffers)
109 }
110 FixedSizeBinary(_) => {
111 let field_node = self.next_node(field)?;
112 let buffers = [self.next_buffer()?, self.next_buffer()?];
113 self.create_primitive_array(field_node, data_type, &buffers)
114 }
115 List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
116 let list_node = self.next_node(field)?;
117 let list_buffers = [self.next_buffer()?, self.next_buffer()?];
118 let values = self.create_array(list_field, variadic_counts)?;
119 self.create_list_array(list_node, data_type, &list_buffers, values)
120 }
121 FixedSizeList(ref list_field, _) => {
122 let list_node = self.next_node(field)?;
123 let list_buffers = [self.next_buffer()?];
124 let values = self.create_array(list_field, variadic_counts)?;
125 self.create_list_array(list_node, data_type, &list_buffers, values)
126 }
127 Struct(struct_fields) => {
128 let struct_node = self.next_node(field)?;
129 let null_buffer = self.next_buffer()?;
130
131 let mut struct_arrays = vec![];
133 for struct_field in struct_fields {
136 let child = self.create_array(struct_field, variadic_counts)?;
137 struct_arrays.push(child);
138 }
139 let null_count = struct_node.null_count() as usize;
140 let struct_array = if struct_arrays.is_empty() {
141 let len = struct_node.length() as usize;
144 StructArray::new_empty_fields(
145 len,
146 (null_count > 0).then(|| BooleanBuffer::new(null_buffer, 0, len).into()),
147 )
148 } else if null_count > 0 {
149 let len = struct_node.length() as usize;
151 let nulls = BooleanBuffer::new(null_buffer, 0, len).into();
152 StructArray::try_new(struct_fields.clone(), struct_arrays, Some(nulls))?
153 } else {
154 StructArray::try_new(struct_fields.clone(), struct_arrays, None)?
155 };
156 Ok(Arc::new(struct_array))
157 }
158 RunEndEncoded(run_ends_field, values_field) => {
159 let run_node = self.next_node(field)?;
160 let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161 let values = self.create_array(values_field, variadic_counts)?;
162
163 let run_array_length = run_node.length() as usize;
164 let array_data = ArrayData::builder(data_type.clone())
165 .len(run_array_length)
166 .offset(0)
167 .add_child_data(run_ends.into_data())
168 .add_child_data(values.into_data())
169 .align_buffers(!self.require_alignment)
170 .build()?;
171
172 Ok(make_array(array_data))
173 }
174 Dictionary(_, _) => {
176 let index_node = self.next_node(field)?;
177 let index_buffers = [self.next_buffer()?, self.next_buffer()?];
178
179 #[allow(deprecated)]
180 let dict_id = field.dict_id().ok_or_else(|| {
181 ArrowError::ParseError(format!("Field {field} does not have dict id"))
182 })?;
183
184 let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
185 ArrowError::ParseError(format!(
186 "Cannot find a dictionary batch with dict id: {dict_id}"
187 ))
188 })?;
189
190 self.create_dictionary_array(
191 index_node,
192 data_type,
193 &index_buffers,
194 value_array.clone(),
195 )
196 }
197 Union(fields, mode) => {
198 let union_node = self.next_node(field)?;
199 let len = union_node.length() as usize;
200
201 if self.version < MetadataVersion::V5 {
204 self.next_buffer()?;
205 }
206
207 let type_ids: ScalarBuffer<i8> =
208 self.next_buffer()?.slice_with_length(0, len).into();
209
210 let value_offsets = match mode {
211 UnionMode::Dense => {
212 let offsets: ScalarBuffer<i32> =
213 self.next_buffer()?.slice_with_length(0, len * 4).into();
214 Some(offsets)
215 }
216 UnionMode::Sparse => None,
217 };
218
219 let mut children = Vec::with_capacity(fields.len());
220
221 for (_id, field) in fields.iter() {
222 let child = self.create_array(field, variadic_counts)?;
223 children.push(child);
224 }
225
226 let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?;
227 Ok(Arc::new(array))
228 }
229 Null => {
230 let node = self.next_node(field)?;
231 let length = node.length();
232 let null_count = node.null_count();
233
234 if length != null_count {
235 return Err(ArrowError::SchemaError(format!(
236 "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
237 )));
238 }
239
240 let array_data = ArrayData::builder(data_type.clone())
241 .len(length as usize)
242 .offset(0)
243 .align_buffers(!self.require_alignment)
244 .build()?;
245
246 Ok(Arc::new(NullArray::from(array_data)))
248 }
249 _ => {
250 let field_node = self.next_node(field)?;
251 let buffers = [self.next_buffer()?, self.next_buffer()?];
252 self.create_primitive_array(field_node, data_type, &buffers)
253 }
254 }
255 }
256
257 fn create_primitive_array(
260 &self,
261 field_node: &FieldNode,
262 data_type: &DataType,
263 buffers: &[Buffer],
264 ) -> Result<ArrayRef, ArrowError> {
265 let length = field_node.length() as usize;
266 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
267 let builder = match data_type {
268 Utf8 | Binary | LargeBinary | LargeUtf8 => {
269 ArrayData::builder(data_type.clone())
271 .len(length)
272 .buffers(buffers[1..3].to_vec())
273 .null_bit_buffer(null_buffer)
274 }
275 BinaryView | Utf8View => ArrayData::builder(data_type.clone())
276 .len(length)
277 .buffers(buffers[1..].to_vec())
278 .null_bit_buffer(null_buffer),
279 _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
280 ArrayData::builder(data_type.clone())
282 .len(length)
283 .add_buffer(buffers[1].clone())
284 .null_bit_buffer(null_buffer)
285 }
286 t => unreachable!("Data type {:?} either unsupported or not primitive", t),
287 };
288
289 let array_data = builder.align_buffers(!self.require_alignment).build()?;
290
291 Ok(make_array(array_data))
292 }
293
294 fn create_list_array(
297 &self,
298 field_node: &FieldNode,
299 data_type: &DataType,
300 buffers: &[Buffer],
301 child_array: ArrayRef,
302 ) -> Result<ArrayRef, ArrowError> {
303 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
304 let length = field_node.length() as usize;
305 let child_data = child_array.into_data();
306 let builder = match data_type {
307 List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
308 .len(length)
309 .add_buffer(buffers[1].clone())
310 .add_child_data(child_data)
311 .null_bit_buffer(null_buffer),
312
313 FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
314 .len(length)
315 .add_child_data(child_data)
316 .null_bit_buffer(null_buffer),
317
318 _ => unreachable!("Cannot create list or map array from {:?}", data_type),
319 };
320
321 let array_data = builder.align_buffers(!self.require_alignment).build()?;
322
323 Ok(make_array(array_data))
324 }
325
326 fn create_dictionary_array(
329 &self,
330 field_node: &FieldNode,
331 data_type: &DataType,
332 buffers: &[Buffer],
333 value_array: ArrayRef,
334 ) -> Result<ArrayRef, ArrowError> {
335 if let Dictionary(_, _) = *data_type {
336 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
337 let array_data = ArrayData::builder(data_type.clone())
338 .len(field_node.length() as usize)
339 .add_buffer(buffers[1].clone())
340 .add_child_data(value_array.into_data())
341 .null_bit_buffer(null_buffer)
342 .align_buffers(!self.require_alignment)
343 .build()?;
344
345 Ok(make_array(array_data))
346 } else {
347 unreachable!("Cannot create dictionary array from {:?}", data_type)
348 }
349 }
350}
351
352struct RecordBatchDecoder<'a> {
357 batch: crate::RecordBatch<'a>,
359 schema: SchemaRef,
361 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
363 compression: Option<CompressionCodec>,
365 version: MetadataVersion,
367 data: &'a Buffer,
369 nodes: VectorIter<'a, FieldNode>,
371 buffers: VectorIter<'a, crate::Buffer>,
373 projection: Option<&'a [usize]>,
376 require_alignment: bool,
379}
380
381impl<'a> RecordBatchDecoder<'a> {
382 fn try_new(
384 buf: &'a Buffer,
385 batch: crate::RecordBatch<'a>,
386 schema: SchemaRef,
387 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
388 metadata: &'a MetadataVersion,
389 ) -> Result<Self, ArrowError> {
390 let buffers = batch.buffers().ok_or_else(|| {
391 ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
392 })?;
393 let field_nodes = batch.nodes().ok_or_else(|| {
394 ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
395 })?;
396
397 let batch_compression = batch.compression();
398 let compression = batch_compression
399 .map(|batch_compression| batch_compression.codec().try_into())
400 .transpose()?;
401
402 Ok(Self {
403 batch,
404 schema,
405 dictionaries_by_id,
406 compression,
407 version: *metadata,
408 data: buf,
409 nodes: field_nodes.iter(),
410 buffers: buffers.iter(),
411 projection: None,
412 require_alignment: false,
413 })
414 }
415
416 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
421 self.projection = projection;
422 self
423 }
424
425 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
431 self.require_alignment = require_alignment;
432 self
433 }
434
435 fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
437 let mut variadic_counts: VecDeque<i64> = self
438 .batch
439 .variadicBufferCounts()
440 .into_iter()
441 .flatten()
442 .collect();
443
444 let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
445
446 let schema = Arc::clone(&self.schema);
447 if let Some(projection) = self.projection {
448 let mut arrays = vec![];
449 for (idx, field) in schema.fields().iter().enumerate() {
451 if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
453 let child = self.create_array(field, &mut variadic_counts)?;
454 arrays.push((proj_idx, child));
455 } else {
456 self.skip_field(field, &mut variadic_counts)?;
457 }
458 }
459 assert!(variadic_counts.is_empty());
460 arrays.sort_by_key(|t| t.0);
461 RecordBatch::try_new_with_options(
462 Arc::new(schema.project(projection)?),
463 arrays.into_iter().map(|t| t.1).collect(),
464 &options,
465 )
466 } else {
467 let mut children = vec![];
468 for field in schema.fields() {
470 let child = self.create_array(field, &mut variadic_counts)?;
471 children.push(child);
472 }
473 assert!(variadic_counts.is_empty());
474 RecordBatch::try_new_with_options(schema, children, &options)
475 }
476 }
477
478 fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
479 read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
480 }
481
482 fn skip_buffer(&mut self) {
483 self.buffers.next().unwrap();
484 }
485
486 fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
487 self.nodes.next().ok_or_else(|| {
488 ArrowError::SchemaError(format!(
489 "Invalid data for schema. {} refers to node not found in schema",
490 field
491 ))
492 })
493 }
494
495 fn skip_field(
496 &mut self,
497 field: &Field,
498 variadic_count: &mut VecDeque<i64>,
499 ) -> Result<(), ArrowError> {
500 self.next_node(field)?;
501
502 match field.data_type() {
503 Utf8 | Binary | LargeBinary | LargeUtf8 => {
504 for _ in 0..3 {
505 self.skip_buffer()
506 }
507 }
508 Utf8View | BinaryView => {
509 let count = variadic_count
510 .pop_front()
511 .ok_or(ArrowError::IpcError(format!(
512 "Missing variadic count for {} column",
513 field.data_type()
514 )))?;
515 let count = count + 2; for _i in 0..count {
517 self.skip_buffer()
518 }
519 }
520 FixedSizeBinary(_) => {
521 self.skip_buffer();
522 self.skip_buffer();
523 }
524 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
525 self.skip_buffer();
526 self.skip_buffer();
527 self.skip_field(list_field, variadic_count)?;
528 }
529 FixedSizeList(list_field, _) => {
530 self.skip_buffer();
531 self.skip_field(list_field, variadic_count)?;
532 }
533 Struct(struct_fields) => {
534 self.skip_buffer();
535
536 for struct_field in struct_fields {
538 self.skip_field(struct_field, variadic_count)?
539 }
540 }
541 RunEndEncoded(run_ends_field, values_field) => {
542 self.skip_field(run_ends_field, variadic_count)?;
543 self.skip_field(values_field, variadic_count)?;
544 }
545 Dictionary(_, _) => {
546 self.skip_buffer(); self.skip_buffer(); }
549 Union(fields, mode) => {
550 self.skip_buffer(); match mode {
553 UnionMode::Dense => self.skip_buffer(),
554 UnionMode::Sparse => {}
555 };
556
557 for (_, field) in fields.iter() {
558 self.skip_field(field, variadic_count)?
559 }
560 }
561 Null => {} _ => {
563 self.skip_buffer();
564 self.skip_buffer();
565 }
566 };
567 Ok(())
568 }
569}
570
571pub fn read_record_batch(
582 buf: &Buffer,
583 batch: crate::RecordBatch,
584 schema: SchemaRef,
585 dictionaries_by_id: &HashMap<i64, ArrayRef>,
586 projection: Option<&[usize]>,
587 metadata: &MetadataVersion,
588) -> Result<RecordBatch, ArrowError> {
589 RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
590 .with_projection(projection)
591 .with_require_alignment(false)
592 .read_record_batch()
593}
594
595pub fn read_dictionary(
598 buf: &Buffer,
599 batch: crate::DictionaryBatch,
600 schema: &Schema,
601 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
602 metadata: &MetadataVersion,
603) -> Result<(), ArrowError> {
604 read_dictionary_impl(buf, batch, schema, dictionaries_by_id, metadata, false)
605}
606
607fn read_dictionary_impl(
608 buf: &Buffer,
609 batch: crate::DictionaryBatch,
610 schema: &Schema,
611 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
612 metadata: &MetadataVersion,
613 require_alignment: bool,
614) -> Result<(), ArrowError> {
615 if batch.isDelta() {
616 return Err(ArrowError::InvalidArgumentError(
617 "delta dictionary batches not supported".to_string(),
618 ));
619 }
620
621 let id = batch.id();
622 #[allow(deprecated)]
623 let fields_using_this_dictionary = schema.fields_with_dict_id(id);
624 let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
625 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
626 })?;
627
628 let dictionary_values: ArrayRef = match first_field.data_type() {
632 DataType::Dictionary(_, ref value_type) => {
633 let value = value_type.as_ref().clone();
635 let schema = Schema::new(vec![Field::new("", value, true)]);
636 let record_batch = RecordBatchDecoder::try_new(
638 buf,
639 batch.data().unwrap(),
640 Arc::new(schema),
641 dictionaries_by_id,
642 metadata,
643 )?
644 .with_require_alignment(require_alignment)
645 .read_record_batch()?;
646
647 Some(record_batch.column(0).clone())
648 }
649 _ => None,
650 }
651 .ok_or_else(|| {
652 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
653 })?;
654
655 dictionaries_by_id.insert(id, dictionary_values.clone());
659
660 Ok(())
661}
662
663fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
665 reader.seek(SeekFrom::Start(block.offset() as u64))?;
666 let body_len = block.bodyLength().to_usize().unwrap();
667 let metadata_len = block.metaDataLength().to_usize().unwrap();
668 let total_len = body_len.checked_add(metadata_len).unwrap();
669
670 let mut buf = MutableBuffer::from_len_zeroed(total_len);
671 reader.read_exact(&mut buf)?;
672 Ok(buf.into())
673}
674
675fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
679 let buf = match buf[..4] == CONTINUATION_MARKER {
680 true => &buf[8..],
681 false => &buf[4..],
682 };
683 crate::root_as_message(buf)
684 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
685}
686
687pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
691 if buf[4..] != super::ARROW_MAGIC {
692 return Err(ArrowError::ParseError(
693 "Arrow file does not contain correct footer".to_string(),
694 ));
695 }
696
697 let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
699 footer_len
700 .try_into()
701 .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
702}
703
704#[derive(Debug)]
769pub struct FileDecoder {
770 schema: SchemaRef,
771 dictionaries: HashMap<i64, ArrayRef>,
772 version: MetadataVersion,
773 projection: Option<Vec<usize>>,
774 require_alignment: bool,
775}
776
777impl FileDecoder {
778 pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
780 Self {
781 schema,
782 version,
783 dictionaries: Default::default(),
784 projection: None,
785 require_alignment: false,
786 }
787 }
788
789 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
791 self.projection = Some(projection);
792 self
793 }
794
795 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
808 self.require_alignment = require_alignment;
809 self
810 }
811
812 fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message<'a>, ArrowError> {
813 let message = parse_message(buf)?;
814
815 if self.version != MetadataVersion::V1 && message.version() != self.version {
817 return Err(ArrowError::IpcError(
818 "Could not read IPC message as metadata versions mismatch".to_string(),
819 ));
820 }
821 Ok(message)
822 }
823
824 pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
826 let message = self.read_message(buf)?;
827 match message.header_type() {
828 crate::MessageHeader::DictionaryBatch => {
829 let batch = message.header_as_dictionary_batch().unwrap();
830 read_dictionary_impl(
831 &buf.slice(block.metaDataLength() as _),
832 batch,
833 &self.schema,
834 &mut self.dictionaries,
835 &message.version(),
836 self.require_alignment,
837 )
838 }
839 t => Err(ArrowError::ParseError(format!(
840 "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
841 ))),
842 }
843 }
844
845 pub fn read_record_batch(
847 &self,
848 block: &Block,
849 buf: &Buffer,
850 ) -> Result<Option<RecordBatch>, ArrowError> {
851 let message = self.read_message(buf)?;
852 match message.header_type() {
853 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
854 "Not expecting a schema when messages are read".to_string(),
855 )),
856 crate::MessageHeader::RecordBatch => {
857 let batch = message.header_as_record_batch().ok_or_else(|| {
858 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
859 })?;
860 RecordBatchDecoder::try_new(
862 &buf.slice(block.metaDataLength() as _),
863 batch,
864 self.schema.clone(),
865 &self.dictionaries,
866 &message.version(),
867 )?
868 .with_projection(self.projection.as_deref())
869 .with_require_alignment(self.require_alignment)
870 .read_record_batch()
871 .map(Some)
872 }
873 crate::MessageHeader::NONE => Ok(None),
874 t => Err(ArrowError::InvalidArgumentError(format!(
875 "Reading types other than record batches not yet supported, unable to read {t:?}"
876 ))),
877 }
878 }
879}
880
881#[derive(Debug)]
883pub struct FileReaderBuilder {
884 projection: Option<Vec<usize>>,
886 max_footer_fb_tables: usize,
888 max_footer_fb_depth: usize,
890}
891
892impl Default for FileReaderBuilder {
893 fn default() -> Self {
894 let verifier_options = VerifierOptions::default();
895 Self {
896 max_footer_fb_tables: verifier_options.max_tables,
897 max_footer_fb_depth: verifier_options.max_depth,
898 projection: None,
899 }
900 }
901}
902
903impl FileReaderBuilder {
904 pub fn new() -> Self {
908 Self::default()
909 }
910
911 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
913 self.projection = Some(projection);
914 self
915 }
916
917 pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
930 self.max_footer_fb_tables = max_footer_fb_tables;
931 self
932 }
933
934 pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
947 self.max_footer_fb_depth = max_footer_fb_depth;
948 self
949 }
950
951 pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
953 let mut buffer = [0; 10];
955 reader.seek(SeekFrom::End(-10))?;
956 reader.read_exact(&mut buffer)?;
957
958 let footer_len = read_footer_length(buffer)?;
959
960 let mut footer_data = vec![0; footer_len];
962 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
963 reader.read_exact(&mut footer_data)?;
964
965 let verifier_options = VerifierOptions {
966 max_tables: self.max_footer_fb_tables,
967 max_depth: self.max_footer_fb_depth,
968 ..Default::default()
969 };
970 let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
971 |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
972 )?;
973
974 let blocks = footer.recordBatches().ok_or_else(|| {
975 ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
976 })?;
977
978 let total_blocks = blocks.len();
979
980 let ipc_schema = footer.schema().unwrap();
981 if !ipc_schema.endianness().equals_to_target_endianness() {
982 return Err(ArrowError::IpcError(
983 "the endianness of the source system does not match the endianness of the target system.".to_owned()
984 ));
985 }
986
987 let schema = crate::convert::fb_to_schema(ipc_schema);
988
989 let mut custom_metadata = HashMap::new();
990 if let Some(fb_custom_metadata) = footer.custom_metadata() {
991 for kv in fb_custom_metadata.into_iter() {
992 custom_metadata.insert(
993 kv.key().unwrap().to_string(),
994 kv.value().unwrap().to_string(),
995 );
996 }
997 }
998
999 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1000 if let Some(projection) = self.projection {
1001 decoder = decoder.with_projection(projection)
1002 }
1003
1004 if let Some(dictionaries) = footer.dictionaries() {
1006 for block in dictionaries {
1007 let buf = read_block(&mut reader, block)?;
1008 decoder.read_dictionary(block, &buf)?;
1009 }
1010 }
1011
1012 Ok(FileReader {
1013 reader,
1014 blocks: blocks.iter().copied().collect(),
1015 current_block: 0,
1016 total_blocks,
1017 decoder,
1018 custom_metadata,
1019 })
1020 }
1021}
1022
1023pub struct FileReader<R> {
1068 reader: R,
1070
1071 decoder: FileDecoder,
1073
1074 blocks: Vec<Block>,
1078
1079 current_block: usize,
1081
1082 total_blocks: usize,
1084
1085 custom_metadata: HashMap<String, String>,
1087}
1088
1089impl<R> fmt::Debug for FileReader<R> {
1090 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1091 f.debug_struct("FileReader<R>")
1092 .field("decoder", &self.decoder)
1093 .field("blocks", &self.blocks)
1094 .field("current_block", &self.current_block)
1095 .field("total_blocks", &self.total_blocks)
1096 .finish_non_exhaustive()
1097 }
1098}
1099
1100impl<R: Read + Seek> FileReader<BufReader<R>> {
1101 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1105 Self::try_new(BufReader::new(reader), projection)
1106 }
1107}
1108
1109impl<R: Read + Seek> FileReader<R> {
1110 pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1121 let builder = FileReaderBuilder {
1122 projection,
1123 ..Default::default()
1124 };
1125 builder.build(reader)
1126 }
1127
1128 pub fn custom_metadata(&self) -> &HashMap<String, String> {
1130 &self.custom_metadata
1131 }
1132
1133 pub fn num_batches(&self) -> usize {
1135 self.total_blocks
1136 }
1137
1138 pub fn schema(&self) -> SchemaRef {
1140 self.decoder.schema.clone()
1141 }
1142
1143 pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1147 if index >= self.total_blocks {
1148 Err(ArrowError::InvalidArgumentError(format!(
1149 "Cannot set batch to index {} from {} total batches",
1150 index, self.total_blocks
1151 )))
1152 } else {
1153 self.current_block = index;
1154 Ok(())
1155 }
1156 }
1157
1158 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1159 let block = &self.blocks[self.current_block];
1160 self.current_block += 1;
1161
1162 let buffer = read_block(&mut self.reader, block)?;
1164 self.decoder.read_record_batch(block, &buffer)
1165 }
1166
1167 pub fn get_ref(&self) -> &R {
1171 &self.reader
1172 }
1173
1174 pub fn get_mut(&mut self) -> &mut R {
1178 &mut self.reader
1179 }
1180}
1181
1182impl<R: Read + Seek> Iterator for FileReader<R> {
1183 type Item = Result<RecordBatch, ArrowError>;
1184
1185 fn next(&mut self) -> Option<Self::Item> {
1186 if self.current_block < self.total_blocks {
1188 self.maybe_next().transpose()
1189 } else {
1190 None
1191 }
1192 }
1193}
1194
1195impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1196 fn schema(&self) -> SchemaRef {
1197 self.schema()
1198 }
1199}
1200
1201pub struct StreamReader<R> {
1235 reader: R,
1237
1238 schema: SchemaRef,
1240
1241 dictionaries_by_id: HashMap<i64, ArrayRef>,
1245
1246 finished: bool,
1250
1251 projection: Option<(Vec<usize>, Schema)>,
1253}
1254
1255impl<R> fmt::Debug for StreamReader<R> {
1256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1257 f.debug_struct("StreamReader<R>")
1258 .field("reader", &"R")
1259 .field("schema", &self.schema)
1260 .field("dictionaries_by_id", &self.dictionaries_by_id)
1261 .field("finished", &self.finished)
1262 .field("projection", &self.projection)
1263 .finish()
1264 }
1265}
1266
1267impl<R: Read> StreamReader<BufReader<R>> {
1268 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1272 Self::try_new(BufReader::new(reader), projection)
1273 }
1274}
1275
1276impl<R: Read> StreamReader<R> {
1277 pub fn try_new(
1289 mut reader: R,
1290 projection: Option<Vec<usize>>,
1291 ) -> Result<StreamReader<R>, ArrowError> {
1292 let mut meta_size: [u8; 4] = [0; 4];
1294 reader.read_exact(&mut meta_size)?;
1295 let meta_len = {
1296 if meta_size == CONTINUATION_MARKER {
1299 reader.read_exact(&mut meta_size)?;
1300 }
1301 i32::from_le_bytes(meta_size)
1302 };
1303
1304 let mut meta_buffer = vec![0; meta_len as usize];
1305 reader.read_exact(&mut meta_buffer)?;
1306
1307 let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
1308 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1309 })?;
1310 let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
1312 ArrowError::ParseError("Unable to read IPC message as schema".to_string())
1313 })?;
1314 let schema = crate::convert::fb_to_schema(ipc_schema);
1315
1316 let dictionaries_by_id = HashMap::new();
1318
1319 let projection = match projection {
1320 Some(projection_indices) => {
1321 let schema = schema.project(&projection_indices)?;
1322 Some((projection_indices, schema))
1323 }
1324 _ => None,
1325 };
1326 Ok(Self {
1327 reader,
1328 schema: Arc::new(schema),
1329 finished: false,
1330 dictionaries_by_id,
1331 projection,
1332 })
1333 }
1334
1335 #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1337 pub fn try_new_unbuffered(
1338 reader: R,
1339 projection: Option<Vec<usize>>,
1340 ) -> Result<Self, ArrowError> {
1341 Self::try_new(reader, projection)
1342 }
1343
1344 pub fn schema(&self) -> SchemaRef {
1346 self.schema.clone()
1347 }
1348
1349 pub fn is_finished(&self) -> bool {
1351 self.finished
1352 }
1353
1354 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1355 if self.finished {
1356 return Ok(None);
1357 }
1358 let mut meta_size: [u8; 4] = [0; 4];
1360
1361 match self.reader.read_exact(&mut meta_size) {
1362 Ok(()) => (),
1363 Err(e) => {
1364 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1365 self.finished = true;
1369 Ok(None)
1370 } else {
1371 Err(ArrowError::from(e))
1372 };
1373 }
1374 }
1375
1376 let meta_len = {
1377 if meta_size == CONTINUATION_MARKER {
1380 self.reader.read_exact(&mut meta_size)?;
1381 }
1382 i32::from_le_bytes(meta_size)
1383 };
1384
1385 if meta_len == 0 {
1386 self.finished = true;
1388 return Ok(None);
1389 }
1390
1391 let mut meta_buffer = vec![0; meta_len as usize];
1392 self.reader.read_exact(&mut meta_buffer)?;
1393
1394 let vecs = &meta_buffer.to_vec();
1395 let message = crate::root_as_message(vecs).map_err(|err| {
1396 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1397 })?;
1398
1399 match message.header_type() {
1400 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1401 "Not expecting a schema when messages are read".to_string(),
1402 )),
1403 crate::MessageHeader::RecordBatch => {
1404 let batch = message.header_as_record_batch().ok_or_else(|| {
1405 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1406 })?;
1407 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1409 self.reader.read_exact(&mut buf)?;
1410
1411 RecordBatchDecoder::try_new(
1412 &buf.into(),
1413 batch,
1414 self.schema(),
1415 &self.dictionaries_by_id,
1416 &message.version(),
1417 )?
1418 .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1419 .with_require_alignment(false)
1420 .read_record_batch()
1421 .map(Some)
1422 }
1423 crate::MessageHeader::DictionaryBatch => {
1424 let batch = message.header_as_dictionary_batch().ok_or_else(|| {
1425 ArrowError::IpcError(
1426 "Unable to read IPC message as dictionary batch".to_string(),
1427 )
1428 })?;
1429 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1431 self.reader.read_exact(&mut buf)?;
1432
1433 read_dictionary_impl(
1434 &buf.into(),
1435 batch,
1436 &self.schema,
1437 &mut self.dictionaries_by_id,
1438 &message.version(),
1439 false,
1440 )?;
1441
1442 self.maybe_next()
1444 }
1445 crate::MessageHeader::NONE => Ok(None),
1446 t => Err(ArrowError::InvalidArgumentError(format!(
1447 "Reading types other than record batches not yet supported, unable to read {t:?} "
1448 ))),
1449 }
1450 }
1451
1452 pub fn get_ref(&self) -> &R {
1456 &self.reader
1457 }
1458
1459 pub fn get_mut(&mut self) -> &mut R {
1463 &mut self.reader
1464 }
1465}
1466
1467impl<R: Read> Iterator for StreamReader<R> {
1468 type Item = Result<RecordBatch, ArrowError>;
1469
1470 fn next(&mut self) -> Option<Self::Item> {
1471 self.maybe_next().transpose()
1472 }
1473}
1474
1475impl<R: Read> RecordBatchReader for StreamReader<R> {
1476 fn schema(&self) -> SchemaRef {
1477 self.schema.clone()
1478 }
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483 use crate::writer::{unslice_run_array, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
1484
1485 use super::*;
1486
1487 use crate::convert::fb_to_schema;
1488 use crate::{root_as_footer, root_as_message};
1489 use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1490 use arrow_array::types::*;
1491 use arrow_buffer::{NullBuffer, OffsetBuffer};
1492 use arrow_data::ArrayDataBuilder;
1493
1494 fn create_test_projection_schema() -> Schema {
1495 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1497
1498 let fixed_size_list_data_type =
1499 DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1500
1501 let union_fields = UnionFields::new(
1502 vec![0, 1],
1503 vec![
1504 Field::new("a", DataType::Int32, false),
1505 Field::new("b", DataType::Float64, false),
1506 ],
1507 );
1508
1509 let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1510
1511 let struct_fields = Fields::from(vec![
1512 Field::new("id", DataType::Int32, false),
1513 Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1514 ]);
1515 let struct_data_type = DataType::Struct(struct_fields);
1516
1517 let run_encoded_data_type = DataType::RunEndEncoded(
1518 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1519 Arc::new(Field::new("values", DataType::Int32, true)),
1520 );
1521
1522 Schema::new(vec![
1524 Field::new("f0", DataType::UInt32, false),
1525 Field::new("f1", DataType::Utf8, false),
1526 Field::new("f2", DataType::Boolean, false),
1527 Field::new("f3", union_data_type, true),
1528 Field::new("f4", DataType::Null, true),
1529 Field::new("f5", DataType::Float64, true),
1530 Field::new("f6", list_data_type, false),
1531 Field::new("f7", DataType::FixedSizeBinary(3), true),
1532 Field::new("f8", fixed_size_list_data_type, false),
1533 Field::new("f9", struct_data_type, false),
1534 Field::new("f10", run_encoded_data_type, false),
1535 Field::new("f11", DataType::Boolean, false),
1536 Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1537 Field::new("f13", DataType::Utf8, false),
1538 ])
1539 }
1540
1541 fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1542 let array0 = UInt32Array::from(vec![1, 2, 3]);
1544 let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1545 let array2 = BooleanArray::from(vec![true, false, true]);
1546
1547 let mut union_builder = UnionBuilder::new_dense();
1548 union_builder.append::<Int32Type>("a", 1).unwrap();
1549 union_builder.append::<Float64Type>("b", 10.1).unwrap();
1550 union_builder.append_null::<Float64Type>("b").unwrap();
1551 let array3 = union_builder.build().unwrap();
1552
1553 let array4 = NullArray::new(3);
1554 let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1555 let array6_values = vec![
1556 Some(vec![Some(10), Some(10), Some(10)]),
1557 Some(vec![Some(20), Some(20), Some(20)]),
1558 Some(vec![Some(30), Some(30)]),
1559 ];
1560 let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1561 let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1562 let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1563
1564 let array8_values = ArrayData::builder(DataType::Int32)
1565 .len(9)
1566 .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1567 .build()
1568 .unwrap();
1569 let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1570 .len(3)
1571 .add_child_data(array8_values)
1572 .build()
1573 .unwrap();
1574 let array8 = FixedSizeListArray::from(array8_data);
1575
1576 let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1577 let array9_list: ArrayRef =
1578 Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1579 Some(vec![Some(-10)]),
1580 Some(vec![Some(-20), Some(-20), Some(-20)]),
1581 Some(vec![Some(-30)]),
1582 ]));
1583 let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1584 .add_child_data(array9_id.into_data())
1585 .add_child_data(array9_list.into_data())
1586 .len(3)
1587 .build()
1588 .unwrap();
1589 let array9: ArrayRef = Arc::new(StructArray::from(array9));
1590
1591 let array10_input = vec![Some(1_i32), None, None];
1592 let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1593 array10_builder.extend(array10_input);
1594 let array10 = array10_builder.finish();
1595
1596 let array11 = BooleanArray::from(vec![false, false, true]);
1597
1598 let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1599 let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1600 let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1601
1602 let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1603
1604 RecordBatch::try_new(
1606 Arc::new(schema.clone()),
1607 vec![
1608 Arc::new(array0),
1609 Arc::new(array1),
1610 Arc::new(array2),
1611 Arc::new(array3),
1612 Arc::new(array4),
1613 Arc::new(array5),
1614 Arc::new(array6),
1615 Arc::new(array7),
1616 Arc::new(array8),
1617 Arc::new(array9),
1618 Arc::new(array10),
1619 Arc::new(array11),
1620 Arc::new(array12),
1621 Arc::new(array13),
1622 ],
1623 )
1624 .unwrap()
1625 }
1626
1627 #[test]
1628 fn test_projection_array_values() {
1629 let schema = create_test_projection_schema();
1631
1632 let batch = create_test_projection_batch_data(&schema);
1634
1635 let mut buf = Vec::new();
1637 {
1638 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
1639 writer.write(&batch).unwrap();
1640 writer.finish().unwrap();
1641 }
1642
1643 for index in 0..12 {
1645 let projection = vec![index];
1646 let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
1647 let read_batch = reader.unwrap().next().unwrap().unwrap();
1648 let projected_column = read_batch.column(0);
1649 let expected_column = batch.column(index);
1650
1651 assert_eq!(projected_column.as_ref(), expected_column.as_ref());
1653 }
1654
1655 {
1656 let reader =
1658 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
1659 let read_batch = reader.unwrap().next().unwrap().unwrap();
1660 let expected_batch = batch.project(&[3, 2, 1]).unwrap();
1661 assert_eq!(read_batch, expected_batch);
1662 }
1663 }
1664
1665 #[test]
1666 fn test_arrow_single_float_row() {
1667 let schema = Schema::new(vec![
1668 Field::new("a", DataType::Float32, false),
1669 Field::new("b", DataType::Float32, false),
1670 Field::new("c", DataType::Int32, false),
1671 Field::new("d", DataType::Int32, false),
1672 ]);
1673 let arrays = vec![
1674 Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
1675 Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
1676 Arc::new(Int32Array::from(vec![2])) as ArrayRef,
1677 Arc::new(Int32Array::from(vec![1])) as ArrayRef,
1678 ];
1679 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
1680 let mut file = tempfile::tempfile().unwrap();
1682 let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
1683 stream_writer.write(&batch).unwrap();
1684 stream_writer.finish().unwrap();
1685
1686 drop(stream_writer);
1687
1688 file.rewind().unwrap();
1689
1690 let reader = StreamReader::try_new(&mut file, None).unwrap();
1692
1693 reader.for_each(|batch| {
1694 let batch = batch.unwrap();
1695 assert!(
1696 batch
1697 .column(0)
1698 .as_any()
1699 .downcast_ref::<Float32Array>()
1700 .unwrap()
1701 .value(0)
1702 != 0.0
1703 );
1704 assert!(
1705 batch
1706 .column(1)
1707 .as_any()
1708 .downcast_ref::<Float32Array>()
1709 .unwrap()
1710 .value(0)
1711 != 0.0
1712 );
1713 });
1714
1715 file.rewind().unwrap();
1716
1717 let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
1719
1720 reader.for_each(|batch| {
1721 let batch = batch.unwrap();
1722 assert_eq!(batch.schema().fields().len(), 2);
1723 assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
1724 assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
1725 });
1726 }
1727
1728 fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
1730 let mut buf = Vec::new();
1731 let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
1732 writer.write(rb).unwrap();
1733 writer.finish().unwrap();
1734 buf
1735 }
1736
1737 fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
1739 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
1740 reader.next().unwrap()
1741 }
1742
1743 fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
1744 let buf = write_ipc(rb);
1745 read_ipc(&buf).unwrap()
1746 }
1747
1748 fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
1751 let buffer = Buffer::from_vec(buf);
1752 let trailer_start = buffer.len() - 10;
1753 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
1754 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
1755 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
1756
1757 let schema = fb_to_schema(footer.schema().unwrap());
1758
1759 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1760 for block in footer.dictionaries().iter().flatten() {
1762 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
1763 let data = buffer.slice_with_length(block.offset() as _, block_len);
1764 decoder.read_dictionary(block, &data)?
1765 }
1766
1767 let batches = footer.recordBatches().unwrap();
1769 assert_eq!(batches.len(), 1); let block = batches.get(0);
1772 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
1773 let data = buffer.slice_with_length(block.offset() as _, block_len);
1774 Ok(decoder.read_record_batch(block, &data)?.unwrap())
1775 }
1776
1777 fn write_stream(rb: &RecordBatch) -> Vec<u8> {
1779 let mut buf = Vec::new();
1780 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
1781 writer.write(rb).unwrap();
1782 writer.finish().unwrap();
1783 buf
1784 }
1785
1786 fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
1788 let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
1789 reader.next().unwrap()
1790 }
1791
1792 fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
1793 let buf = write_stream(rb);
1794 read_stream(&buf).unwrap()
1795 }
1796
1797 #[test]
1798 fn test_roundtrip_with_custom_metadata() {
1799 let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
1800 let mut buf = Vec::new();
1801 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
1802 let mut test_metadata = HashMap::new();
1803 test_metadata.insert("abc".to_string(), "abc".to_string());
1804 test_metadata.insert("def".to_string(), "def".to_string());
1805 for (k, v) in &test_metadata {
1806 writer.write_metadata(k, v);
1807 }
1808 writer.finish().unwrap();
1809 drop(writer);
1810
1811 let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1812 assert_eq!(reader.custom_metadata(), &test_metadata);
1813 }
1814
1815 #[test]
1816 fn test_roundtrip_nested_dict() {
1817 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
1818
1819 let array = Arc::new(inner) as ArrayRef;
1820
1821 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
1822
1823 let s = StructArray::from(vec![(dctfield, array)]);
1824 let struct_array = Arc::new(s) as ArrayRef;
1825
1826 let schema = Arc::new(Schema::new(vec![Field::new(
1827 "struct",
1828 struct_array.data_type().clone(),
1829 false,
1830 )]));
1831
1832 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
1833
1834 assert_eq!(batch, roundtrip_ipc(&batch));
1835 }
1836
1837 #[test]
1838 fn test_roundtrip_nested_dict_no_preserve_dict_id() {
1839 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
1840
1841 let array = Arc::new(inner) as ArrayRef;
1842
1843 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
1844
1845 let s = StructArray::from(vec![(dctfield, array)]);
1846 let struct_array = Arc::new(s) as ArrayRef;
1847
1848 let schema = Arc::new(Schema::new(vec![Field::new(
1849 "struct",
1850 struct_array.data_type().clone(),
1851 false,
1852 )]));
1853
1854 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
1855
1856 let mut buf = Vec::new();
1857 let mut writer = crate::writer::FileWriter::try_new_with_options(
1858 &mut buf,
1859 batch.schema_ref(),
1860 #[allow(deprecated)]
1861 IpcWriteOptions::default().with_preserve_dict_id(false),
1862 )
1863 .unwrap();
1864 writer.write(&batch).unwrap();
1865 writer.finish().unwrap();
1866 drop(writer);
1867
1868 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1869
1870 assert_eq!(batch, reader.next().unwrap().unwrap());
1871 }
1872
1873 fn check_union_with_builder(mut builder: UnionBuilder) {
1874 builder.append::<Int32Type>("a", 1).unwrap();
1875 builder.append_null::<Int32Type>("a").unwrap();
1876 builder.append::<Float64Type>("c", 3.0).unwrap();
1877 builder.append::<Int32Type>("a", 4).unwrap();
1878 builder.append::<Int64Type>("d", 11).unwrap();
1879 let union = builder.build().unwrap();
1880
1881 let schema = Arc::new(Schema::new(vec![Field::new(
1882 "union",
1883 union.data_type().clone(),
1884 false,
1885 )]));
1886
1887 let union_array = Arc::new(union) as ArrayRef;
1888
1889 let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
1890 let rb2 = roundtrip_ipc(&rb);
1891 assert_eq!(rb.schema(), rb2.schema());
1894 assert_eq!(rb.num_columns(), rb2.num_columns());
1895 assert_eq!(rb.num_rows(), rb2.num_rows());
1896 let union1 = rb.column(0);
1897 let union2 = rb2.column(0);
1898
1899 assert_eq!(union1, union2);
1900 }
1901
1902 #[test]
1903 fn test_roundtrip_dense_union() {
1904 check_union_with_builder(UnionBuilder::new_dense());
1905 }
1906
1907 #[test]
1908 fn test_roundtrip_sparse_union() {
1909 check_union_with_builder(UnionBuilder::new_sparse());
1910 }
1911
1912 #[test]
1913 fn test_roundtrip_struct_empty_fields() {
1914 let nulls = NullBuffer::from(&[true, true, false]);
1915 let rb = RecordBatch::try_from_iter([(
1916 "",
1917 Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
1918 )])
1919 .unwrap();
1920 let rb2 = roundtrip_ipc(&rb);
1921 assert_eq!(rb, rb2);
1922 }
1923
1924 #[test]
1925 fn test_roundtrip_stream_run_array_sliced() {
1926 let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
1927 .into_iter()
1928 .collect();
1929 let run_array_1_sliced = run_array_1.slice(2, 5);
1930
1931 let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
1932 let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1933 run_array_2_builder.extend(run_array_2_inupt);
1934 let run_array_2 = run_array_2_builder.finish();
1935
1936 let schema = Arc::new(Schema::new(vec![
1937 Field::new(
1938 "run_array_1_sliced",
1939 run_array_1_sliced.data_type().clone(),
1940 false,
1941 ),
1942 Field::new("run_array_2", run_array_2.data_type().clone(), false),
1943 ]));
1944 let input_batch = RecordBatch::try_new(
1945 schema,
1946 vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
1947 )
1948 .unwrap();
1949 let output_batch = roundtrip_ipc_stream(&input_batch);
1950
1951 assert_eq!(input_batch.column(1), output_batch.column(1));
1955
1956 let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
1957 assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
1958 }
1959
1960 #[test]
1961 fn test_roundtrip_stream_nested_dict() {
1962 let xs = vec!["AA", "BB", "AA", "CC", "BB"];
1963 let dict = Arc::new(
1964 xs.clone()
1965 .into_iter()
1966 .collect::<DictionaryArray<Int8Type>>(),
1967 );
1968 let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
1969 let struct_array = StructArray::from(vec![
1970 (
1971 Arc::new(Field::new("f2.1", DataType::Utf8, false)),
1972 string_array,
1973 ),
1974 (
1975 Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
1976 dict.clone() as ArrayRef,
1977 ),
1978 ]);
1979 let schema = Arc::new(Schema::new(vec![
1980 Field::new("f1_string", DataType::Utf8, false),
1981 Field::new("f2_struct", struct_array.data_type().clone(), false),
1982 ]));
1983 let input_batch = RecordBatch::try_new(
1984 schema,
1985 vec![
1986 Arc::new(StringArray::from(xs.clone())),
1987 Arc::new(struct_array),
1988 ],
1989 )
1990 .unwrap();
1991 let output_batch = roundtrip_ipc_stream(&input_batch);
1992 assert_eq!(input_batch, output_batch);
1993 }
1994
1995 #[test]
1996 fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
1997 let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
1998 let values = Arc::new(values) as ArrayRef;
1999 let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2000 let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2001
2002 let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2003 let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2004
2005 #[allow(deprecated)]
2006 let keys_field = Arc::new(Field::new_dict(
2007 "keys",
2008 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2009 true, 1,
2011 false,
2012 ));
2013 #[allow(deprecated)]
2014 let values_field = Arc::new(Field::new_dict(
2015 "values",
2016 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2017 true,
2018 2,
2019 false,
2020 ));
2021 let entry_struct = StructArray::from(vec![
2022 (keys_field, make_array(key_dict_array.into_data())),
2023 (values_field, make_array(value_dict_array.into_data())),
2024 ]);
2025 let map_data_type = DataType::Map(
2026 Arc::new(Field::new(
2027 "entries",
2028 entry_struct.data_type().clone(),
2029 false,
2030 )),
2031 false,
2032 );
2033
2034 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2035 let map_data = ArrayData::builder(map_data_type)
2036 .len(3)
2037 .add_buffer(entry_offsets)
2038 .add_child_data(entry_struct.into_data())
2039 .build()
2040 .unwrap();
2041 let map_array = MapArray::from(map_data);
2042
2043 let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2044 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2045
2046 let schema = Arc::new(Schema::new(vec![Field::new(
2047 "f1",
2048 dict_dict_array.data_type().clone(),
2049 false,
2050 )]));
2051 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2052 let output_batch = roundtrip_ipc_stream(&input_batch);
2053 assert_eq!(input_batch, output_batch);
2054 }
2055
2056 fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2057 OffsetSize: OffsetSizeTrait,
2058 U: ArrowNativeType,
2059 >(
2060 list_data_type: DataType,
2061 offsets: &[U; 5],
2062 ) {
2063 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2064 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2065 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2066 let dict_data = dict_array.to_data();
2067
2068 let value_offsets = Buffer::from_slice_ref(offsets);
2069
2070 let list_data = ArrayData::builder(list_data_type)
2071 .len(4)
2072 .add_buffer(value_offsets)
2073 .add_child_data(dict_data)
2074 .build()
2075 .unwrap();
2076 let list_array = GenericListArray::<OffsetSize>::from(list_data);
2077
2078 let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2079 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2080
2081 let schema = Arc::new(Schema::new(vec![Field::new(
2082 "f1",
2083 dict_dict_array.data_type().clone(),
2084 false,
2085 )]));
2086 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2087 let output_batch = roundtrip_ipc_stream(&input_batch);
2088 assert_eq!(input_batch, output_batch);
2089 }
2090
2091 #[test]
2092 fn test_roundtrip_stream_dict_of_list_of_dict() {
2093 #[allow(deprecated)]
2095 let list_data_type = DataType::List(Arc::new(Field::new_dict(
2096 "item",
2097 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2098 true,
2099 1,
2100 false,
2101 )));
2102 let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2103 test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2104
2105 #[allow(deprecated)]
2107 let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2108 "item",
2109 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2110 true,
2111 1,
2112 false,
2113 )));
2114 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2115 test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2116 }
2117
2118 #[test]
2119 fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2120 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2121 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2122 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2123 let dict_data = dict_array.into_data();
2124
2125 #[allow(deprecated)]
2126 let list_data_type = DataType::FixedSizeList(
2127 Arc::new(Field::new_dict(
2128 "item",
2129 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2130 true,
2131 1,
2132 false,
2133 )),
2134 3,
2135 );
2136 let list_data = ArrayData::builder(list_data_type)
2137 .len(3)
2138 .add_child_data(dict_data)
2139 .build()
2140 .unwrap();
2141 let list_array = FixedSizeListArray::from(list_data);
2142
2143 let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2144 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2145
2146 let schema = Arc::new(Schema::new(vec![Field::new(
2147 "f1",
2148 dict_dict_array.data_type().clone(),
2149 false,
2150 )]));
2151 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2152 let output_batch = roundtrip_ipc_stream(&input_batch);
2153 assert_eq!(input_batch, output_batch);
2154 }
2155
2156 const LONG_TEST_STRING: &str =
2157 "This is a long string to make sure binary view array handles it";
2158
2159 #[test]
2160 fn test_roundtrip_view_types() {
2161 let schema = Schema::new(vec![
2162 Field::new("field_1", DataType::BinaryView, true),
2163 Field::new("field_2", DataType::Utf8, true),
2164 Field::new("field_3", DataType::Utf8View, true),
2165 ]);
2166 let bin_values: Vec<Option<&[u8]>> = vec![
2167 Some(b"foo"),
2168 None,
2169 Some(b"bar"),
2170 Some(LONG_TEST_STRING.as_bytes()),
2171 ];
2172 let utf8_values: Vec<Option<&str>> =
2173 vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2174 let bin_view_array = BinaryViewArray::from_iter(bin_values);
2175 let utf8_array = StringArray::from_iter(utf8_values.iter());
2176 let utf8_view_array = StringViewArray::from_iter(utf8_values);
2177 let record_batch = RecordBatch::try_new(
2178 Arc::new(schema.clone()),
2179 vec![
2180 Arc::new(bin_view_array),
2181 Arc::new(utf8_array),
2182 Arc::new(utf8_view_array),
2183 ],
2184 )
2185 .unwrap();
2186
2187 assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2188 assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2189
2190 let sliced_batch = record_batch.slice(1, 2);
2191 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2192 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2193 }
2194
2195 #[test]
2196 fn test_roundtrip_view_types_nested_dict() {
2197 let bin_values: Vec<Option<&[u8]>> = vec![
2198 Some(b"foo"),
2199 None,
2200 Some(b"bar"),
2201 Some(LONG_TEST_STRING.as_bytes()),
2202 Some(b"field"),
2203 ];
2204 let utf8_values: Vec<Option<&str>> = vec![
2205 Some("foo"),
2206 None,
2207 Some("bar"),
2208 Some(LONG_TEST_STRING),
2209 Some("field"),
2210 ];
2211 let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2212 let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2213
2214 let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2215 let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2216 #[allow(deprecated)]
2217 let keys_field = Arc::new(Field::new_dict(
2218 "keys",
2219 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2220 true,
2221 1,
2222 false,
2223 ));
2224
2225 let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2226 let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2227 #[allow(deprecated)]
2228 let values_field = Arc::new(Field::new_dict(
2229 "values",
2230 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2231 true,
2232 2,
2233 false,
2234 ));
2235 let entry_struct = StructArray::from(vec![
2236 (keys_field, make_array(key_dict_array.into_data())),
2237 (values_field, make_array(value_dict_array.into_data())),
2238 ]);
2239
2240 let map_data_type = DataType::Map(
2241 Arc::new(Field::new(
2242 "entries",
2243 entry_struct.data_type().clone(),
2244 false,
2245 )),
2246 false,
2247 );
2248 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2249 let map_data = ArrayData::builder(map_data_type)
2250 .len(3)
2251 .add_buffer(entry_offsets)
2252 .add_child_data(entry_struct.into_data())
2253 .build()
2254 .unwrap();
2255 let map_array = MapArray::from(map_data);
2256
2257 let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2258 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2259 let schema = Arc::new(Schema::new(vec![Field::new(
2260 "f1",
2261 dict_dict_array.data_type().clone(),
2262 false,
2263 )]));
2264 let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2265 assert_eq!(batch, roundtrip_ipc(&batch));
2266 assert_eq!(batch, roundtrip_ipc_stream(&batch));
2267
2268 let sliced_batch = batch.slice(1, 2);
2269 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2270 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2271 }
2272
2273 #[test]
2274 fn test_no_columns_batch() {
2275 let schema = Arc::new(Schema::empty());
2276 let options = RecordBatchOptions::new()
2277 .with_match_field_names(true)
2278 .with_row_count(Some(10));
2279 let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2280 let output_batch = roundtrip_ipc_stream(&input_batch);
2281 assert_eq!(input_batch, output_batch);
2282 }
2283
2284 #[test]
2285 fn test_unaligned() {
2286 let batch = RecordBatch::try_from_iter(vec![(
2287 "i32",
2288 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2289 )])
2290 .unwrap();
2291
2292 let gen = IpcDataGenerator {};
2293 #[allow(deprecated)]
2294 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2295 let (_, encoded) = gen
2296 .encoded_batch(&batch, &mut dict_tracker, &Default::default())
2297 .unwrap();
2298
2299 let message = root_as_message(&encoded.ipc_message).unwrap();
2300
2301 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2303 buffer.push(0_u8);
2304 buffer.extend_from_slice(&encoded.arrow_data);
2305 let b = Buffer::from(buffer).slice(1);
2306 assert_ne!(b.as_ptr().align_offset(8), 0);
2307
2308 let ipc_batch = message.header_as_record_batch().unwrap();
2309 let roundtrip = RecordBatchDecoder::try_new(
2310 &b,
2311 ipc_batch,
2312 batch.schema(),
2313 &Default::default(),
2314 &message.version(),
2315 )
2316 .unwrap()
2317 .with_require_alignment(false)
2318 .read_record_batch()
2319 .unwrap();
2320 assert_eq!(batch, roundtrip);
2321 }
2322
2323 #[test]
2324 fn test_unaligned_throws_error_with_require_alignment() {
2325 let batch = RecordBatch::try_from_iter(vec![(
2326 "i32",
2327 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2328 )])
2329 .unwrap();
2330
2331 let gen = IpcDataGenerator {};
2332 #[allow(deprecated)]
2333 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2334 let (_, encoded) = gen
2335 .encoded_batch(&batch, &mut dict_tracker, &Default::default())
2336 .unwrap();
2337
2338 let message = root_as_message(&encoded.ipc_message).unwrap();
2339
2340 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2342 buffer.push(0_u8);
2343 buffer.extend_from_slice(&encoded.arrow_data);
2344 let b = Buffer::from(buffer).slice(1);
2345 assert_ne!(b.as_ptr().align_offset(8), 0);
2346
2347 let ipc_batch = message.header_as_record_batch().unwrap();
2348 let result = RecordBatchDecoder::try_new(
2349 &b,
2350 ipc_batch,
2351 batch.schema(),
2352 &Default::default(),
2353 &message.version(),
2354 )
2355 .unwrap()
2356 .with_require_alignment(true)
2357 .read_record_batch();
2358
2359 let error = result.unwrap_err();
2360 assert_eq!(
2361 error.to_string(),
2362 "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2363 offset from expected alignment of 4 by 1"
2364 );
2365 }
2366
2367 #[test]
2368 fn test_file_with_massive_column_count() {
2369 let limit = 600_000;
2371
2372 let fields = (0..limit)
2373 .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2374 .collect::<Vec<_>>();
2375 let schema = Arc::new(Schema::new(fields));
2376 let batch = RecordBatch::new_empty(schema);
2377
2378 let mut buf = Vec::new();
2379 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2380 writer.write(&batch).unwrap();
2381 writer.finish().unwrap();
2382 drop(writer);
2383
2384 let mut reader = FileReaderBuilder::new()
2385 .with_max_footer_fb_tables(1_500_000)
2386 .build(std::io::Cursor::new(buf))
2387 .unwrap();
2388 let roundtrip_batch = reader.next().unwrap().unwrap();
2389
2390 assert_eq!(batch, roundtrip_batch);
2391 }
2392
2393 #[test]
2394 fn test_file_with_deeply_nested_columns() {
2395 let limit = 61;
2397
2398 let fields = (0..limit).fold(
2399 vec![Field::new("leaf", DataType::Boolean, false)],
2400 |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2401 );
2402 let schema = Arc::new(Schema::new(fields));
2403 let batch = RecordBatch::new_empty(schema);
2404
2405 let mut buf = Vec::new();
2406 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2407 writer.write(&batch).unwrap();
2408 writer.finish().unwrap();
2409 drop(writer);
2410
2411 let mut reader = FileReaderBuilder::new()
2412 .with_max_footer_fb_depth(65)
2413 .build(std::io::Cursor::new(buf))
2414 .unwrap();
2415 let roundtrip_batch = reader.next().unwrap().unwrap();
2416
2417 assert_eq!(batch, roundtrip_batch);
2418 }
2419
2420 #[test]
2421 fn test_invalid_struct_array_ipc_read_errors() {
2422 let a_field = Field::new("a", DataType::Int32, false);
2423 let b_field = Field::new("b", DataType::Int32, false);
2424
2425 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2426 "s",
2427 vec![a_field.clone(), b_field.clone()],
2428 false,
2429 )]));
2430
2431 let a_array_data = ArrayData::builder(a_field.data_type().clone())
2432 .len(4)
2433 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2434 .build()
2435 .unwrap();
2436 let b_array_data = ArrayData::builder(b_field.data_type().clone())
2437 .len(3)
2438 .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2439 .build()
2440 .unwrap();
2441
2442 let struct_data_type = schema.field(0).data_type();
2443
2444 let invalid_struct_arr = unsafe {
2445 make_array(
2446 ArrayData::builder(struct_data_type.clone())
2447 .len(4)
2448 .add_child_data(a_array_data)
2449 .add_child_data(b_array_data)
2450 .build_unchecked(),
2451 )
2452 };
2453 expect_ipc_validation_error(
2454 Arc::new(invalid_struct_arr),
2455 "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
2456 );
2457 }
2458
2459 #[test]
2460 fn test_same_dict_id_without_preserve() {
2461 let batch = RecordBatch::try_new(
2462 Arc::new(Schema::new(
2463 ["a", "b"]
2464 .iter()
2465 .map(|name| {
2466 #[allow(deprecated)]
2467 Field::new_dict(
2468 name.to_string(),
2469 DataType::Dictionary(
2470 Box::new(DataType::Int32),
2471 Box::new(DataType::Utf8),
2472 ),
2473 true,
2474 0,
2475 false,
2476 )
2477 })
2478 .collect::<Vec<Field>>(),
2479 )),
2480 vec![
2481 Arc::new(
2482 vec![Some("c"), Some("d")]
2483 .into_iter()
2484 .collect::<DictionaryArray<Int32Type>>(),
2485 ) as ArrayRef,
2486 Arc::new(
2487 vec![Some("e"), Some("f")]
2488 .into_iter()
2489 .collect::<DictionaryArray<Int32Type>>(),
2490 ) as ArrayRef,
2491 ],
2492 )
2493 .expect("Failed to create RecordBatch");
2494
2495 let mut buf = vec![];
2497 {
2498 let mut writer = crate::writer::StreamWriter::try_new_with_options(
2499 &mut buf,
2500 batch.schema().as_ref(),
2501 #[allow(deprecated)]
2502 crate::writer::IpcWriteOptions::default().with_preserve_dict_id(false),
2503 )
2504 .expect("Failed to create StreamWriter");
2505 writer.write(&batch).expect("Failed to write RecordBatch");
2506 writer.finish().expect("Failed to finish StreamWriter");
2507 }
2508
2509 StreamReader::try_new(std::io::Cursor::new(buf), None)
2510 .expect("Failed to create StreamReader")
2511 .for_each(|decoded_batch| {
2512 assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
2513 });
2514 }
2515
2516 #[test]
2517 fn test_validation_of_invalid_list_array() {
2518 let array = unsafe {
2520 let values = Int32Array::from(vec![1, 2, 3]);
2521 let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); let offsets = OffsetBuffer::new_unchecked(bad_offsets); let field = Field::new_list_field(DataType::Int32, true);
2524 let nulls = None;
2525 ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
2526 };
2527
2528 expect_ipc_validation_error(
2529 Arc::new(array),
2530 "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2"
2531 );
2532 }
2533
2534 #[test]
2535 fn test_validation_of_invalid_string_array() {
2536 let valid: &[u8] = b" ";
2537 let mut invalid = vec![];
2538 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2539 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2540 let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2541 let array = unsafe {
2544 StringArray::new_unchecked(
2545 binary_array.offsets().clone(),
2546 binary_array.values().clone(),
2547 binary_array.nulls().cloned(),
2548 )
2549 };
2550 expect_ipc_validation_error(
2551 Arc::new(array),
2552 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38"
2553 );
2554 }
2555
2556 #[test]
2557 fn test_validation_of_invalid_string_view_array() {
2558 let valid: &[u8] = b" ";
2559 let mut invalid = vec![];
2560 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2561 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2562 let binary_view_array =
2563 BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2564 let array = unsafe {
2567 StringViewArray::new_unchecked(
2568 binary_view_array.views().clone(),
2569 binary_view_array.data_buffers().to_vec(),
2570 binary_view_array.nulls().cloned(),
2571 )
2572 };
2573 expect_ipc_validation_error(
2574 Arc::new(array),
2575 "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38"
2576 );
2577 }
2578
2579 #[test]
2582 fn test_validation_of_invalid_dictionary_array() {
2583 let array = unsafe {
2584 let values = StringArray::from_iter_values(["a", "b", "c"]);
2585 let keys = Int32Array::from(vec![1, 200]); DictionaryArray::new_unchecked(keys, Arc::new(values))
2587 };
2588
2589 expect_ipc_validation_error(
2590 Arc::new(array),
2591 "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
2592 );
2593 }
2594
2595 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
2598
2599 fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
2601 let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
2602
2603 let buf = write_stream(&rb); let err = read_stream(&buf).unwrap_err();
2606 assert_eq!(err.to_string(), expected_err);
2607
2608 let buf = write_ipc(&rb); let err = read_ipc(&buf).unwrap_err();
2611 assert_eq!(err.to_string(), expected_err);
2612
2613 let err = read_ipc_with_decoder(buf).unwrap_err();
2618 assert_eq!(err.to_string(), expected_err);
2619 }
2620}