arrow_ipc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Readers
19//!
20//! # Notes
21//!
22//! The [`FileReader`] and [`StreamReader`] have similar interfaces,
23//! however the [`FileReader`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27mod stream;
28
29pub use stream::*;
30
31use flatbuffers::{VectorIter, VerifierOptions};
32use std::collections::{HashMap, VecDeque};
33use std::fmt;
34use std::io::{BufReader, Read, Seek, SeekFrom};
35use std::sync::Arc;
36
37use arrow_array::*;
38use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, ScalarBuffer};
39use arrow_data::ArrayData;
40use arrow_schema::*;
41
42use crate::compression::CompressionCodec;
43use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER};
44use DataType::*;
45
46/// Read a buffer based on offset and length
47/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
48/// Each constituent buffer is first compressed with the indicated
49/// compressor, and then written with the uncompressed length in the first 8
50/// bytes as a 64-bit little-endian signed integer followed by the compressed
51/// buffer bytes (and then padding as required by the protocol). The
52/// uncompressed length may be set to -1 to indicate that the data that
53/// follows is not compressed, which can be useful for cases where
54/// compression does not yield appreciable savings.
55fn read_buffer(
56    buf: &crate::Buffer,
57    a_data: &Buffer,
58    compression_codec: Option<CompressionCodec>,
59) -> Result<Buffer, ArrowError> {
60    let start_offset = buf.offset() as usize;
61    let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
62    // corner case: empty buffer
63    match (buf_data.is_empty(), compression_codec) {
64        (true, _) | (_, None) => Ok(buf_data),
65        (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
66    }
67}
68impl RecordBatchDecoder<'_> {
69    /// Coordinates reading arrays based on data types.
70    ///
71    /// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
72    /// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
73    ///
74    /// Notes:
75    /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
76    /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
77    ///   We thus:
78    ///     - check if the bit width of non-64-bit numbers is 64, and
79    ///     - read the buffer as 64-bit (signed integer or float), and
80    ///     - cast the 64-bit array to the appropriate data type
81    fn create_array(
82        &mut self,
83        field: &Field,
84        variadic_counts: &mut VecDeque<i64>,
85    ) -> Result<ArrayRef, ArrowError> {
86        let data_type = field.data_type();
87        match data_type {
88            Utf8 | Binary | LargeBinary | LargeUtf8 => {
89                let field_node = self.next_node(field)?;
90                let buffers = [
91                    self.next_buffer()?,
92                    self.next_buffer()?,
93                    self.next_buffer()?,
94                ];
95                self.create_primitive_array(field_node, data_type, &buffers)
96            }
97            BinaryView | Utf8View => {
98                let count = variadic_counts
99                    .pop_front()
100                    .ok_or(ArrowError::IpcError(format!(
101                        "Missing variadic count for {data_type} column"
102                    )))?;
103                let count = count + 2; // view and null buffer.
104                let buffers = (0..count)
105                    .map(|_| self.next_buffer())
106                    .collect::<Result<Vec<_>, _>>()?;
107                let field_node = self.next_node(field)?;
108                self.create_primitive_array(field_node, data_type, &buffers)
109            }
110            FixedSizeBinary(_) => {
111                let field_node = self.next_node(field)?;
112                let buffers = [self.next_buffer()?, self.next_buffer()?];
113                self.create_primitive_array(field_node, data_type, &buffers)
114            }
115            List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
116                let list_node = self.next_node(field)?;
117                let list_buffers = [self.next_buffer()?, self.next_buffer()?];
118                let values = self.create_array(list_field, variadic_counts)?;
119                self.create_list_array(list_node, data_type, &list_buffers, values)
120            }
121            FixedSizeList(ref list_field, _) => {
122                let list_node = self.next_node(field)?;
123                let list_buffers = [self.next_buffer()?];
124                let values = self.create_array(list_field, variadic_counts)?;
125                self.create_list_array(list_node, data_type, &list_buffers, values)
126            }
127            Struct(struct_fields) => {
128                let struct_node = self.next_node(field)?;
129                let null_buffer = self.next_buffer()?;
130
131                // read the arrays for each field
132                let mut struct_arrays = vec![];
133                // TODO investigate whether just knowing the number of buffers could
134                // still work
135                for struct_field in struct_fields {
136                    let child = self.create_array(struct_field, variadic_counts)?;
137                    struct_arrays.push(child);
138                }
139                let null_count = struct_node.null_count() as usize;
140                let struct_array = if struct_arrays.is_empty() {
141                    // `StructArray::from` can't infer the correct row count
142                    // if we have zero fields
143                    let len = struct_node.length() as usize;
144                    StructArray::new_empty_fields(
145                        len,
146                        (null_count > 0).then(|| BooleanBuffer::new(null_buffer, 0, len).into()),
147                    )
148                } else if null_count > 0 {
149                    // create struct array from fields, arrays and null data
150                    let len = struct_node.length() as usize;
151                    let nulls = BooleanBuffer::new(null_buffer, 0, len).into();
152                    StructArray::try_new(struct_fields.clone(), struct_arrays, Some(nulls))?
153                } else {
154                    StructArray::try_new(struct_fields.clone(), struct_arrays, None)?
155                };
156                Ok(Arc::new(struct_array))
157            }
158            RunEndEncoded(run_ends_field, values_field) => {
159                let run_node = self.next_node(field)?;
160                let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161                let values = self.create_array(values_field, variadic_counts)?;
162
163                let run_array_length = run_node.length() as usize;
164                let array_data = ArrayData::builder(data_type.clone())
165                    .len(run_array_length)
166                    .offset(0)
167                    .add_child_data(run_ends.into_data())
168                    .add_child_data(values.into_data())
169                    .align_buffers(!self.require_alignment)
170                    .build()?;
171
172                Ok(make_array(array_data))
173            }
174            // Create dictionary array from RecordBatch
175            Dictionary(_, _) => {
176                let index_node = self.next_node(field)?;
177                let index_buffers = [self.next_buffer()?, self.next_buffer()?];
178
179                #[allow(deprecated)]
180                let dict_id = field.dict_id().ok_or_else(|| {
181                    ArrowError::ParseError(format!("Field {field} does not have dict id"))
182                })?;
183
184                let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
185                    ArrowError::ParseError(format!(
186                        "Cannot find a dictionary batch with dict id: {dict_id}"
187                    ))
188                })?;
189
190                self.create_dictionary_array(
191                    index_node,
192                    data_type,
193                    &index_buffers,
194                    value_array.clone(),
195                )
196            }
197            Union(fields, mode) => {
198                let union_node = self.next_node(field)?;
199                let len = union_node.length() as usize;
200
201                // In V4, union types has validity bitmap
202                // In V5 and later, union types have no validity bitmap
203                if self.version < MetadataVersion::V5 {
204                    self.next_buffer()?;
205                }
206
207                let type_ids: ScalarBuffer<i8> =
208                    self.next_buffer()?.slice_with_length(0, len).into();
209
210                let value_offsets = match mode {
211                    UnionMode::Dense => {
212                        let offsets: ScalarBuffer<i32> =
213                            self.next_buffer()?.slice_with_length(0, len * 4).into();
214                        Some(offsets)
215                    }
216                    UnionMode::Sparse => None,
217                };
218
219                let mut children = Vec::with_capacity(fields.len());
220
221                for (_id, field) in fields.iter() {
222                    let child = self.create_array(field, variadic_counts)?;
223                    children.push(child);
224                }
225
226                let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?;
227                Ok(Arc::new(array))
228            }
229            Null => {
230                let node = self.next_node(field)?;
231                let length = node.length();
232                let null_count = node.null_count();
233
234                if length != null_count {
235                    return Err(ArrowError::SchemaError(format!(
236                        "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
237                    )));
238                }
239
240                let array_data = ArrayData::builder(data_type.clone())
241                    .len(length as usize)
242                    .offset(0)
243                    .align_buffers(!self.require_alignment)
244                    .build()?;
245
246                // no buffer increases
247                Ok(Arc::new(NullArray::from(array_data)))
248            }
249            _ => {
250                let field_node = self.next_node(field)?;
251                let buffers = [self.next_buffer()?, self.next_buffer()?];
252                self.create_primitive_array(field_node, data_type, &buffers)
253            }
254        }
255    }
256
257    /// Reads the correct number of buffers based on data type and null_count, and creates a
258    /// primitive array ref
259    fn create_primitive_array(
260        &self,
261        field_node: &FieldNode,
262        data_type: &DataType,
263        buffers: &[Buffer],
264    ) -> Result<ArrayRef, ArrowError> {
265        let length = field_node.length() as usize;
266        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
267        let builder = match data_type {
268            Utf8 | Binary | LargeBinary | LargeUtf8 => {
269                // read 3 buffers: null buffer (optional), offsets buffer and data buffer
270                ArrayData::builder(data_type.clone())
271                    .len(length)
272                    .buffers(buffers[1..3].to_vec())
273                    .null_bit_buffer(null_buffer)
274            }
275            BinaryView | Utf8View => ArrayData::builder(data_type.clone())
276                .len(length)
277                .buffers(buffers[1..].to_vec())
278                .null_bit_buffer(null_buffer),
279            _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
280                // read 2 buffers: null buffer (optional) and data buffer
281                ArrayData::builder(data_type.clone())
282                    .len(length)
283                    .add_buffer(buffers[1].clone())
284                    .null_bit_buffer(null_buffer)
285            }
286            t => unreachable!("Data type {:?} either unsupported or not primitive", t),
287        };
288
289        let array_data = builder.align_buffers(!self.require_alignment).build()?;
290
291        Ok(make_array(array_data))
292    }
293
294    /// Reads the correct number of buffers based on list type and null_count, and creates a
295    /// list array ref
296    fn create_list_array(
297        &self,
298        field_node: &FieldNode,
299        data_type: &DataType,
300        buffers: &[Buffer],
301        child_array: ArrayRef,
302    ) -> Result<ArrayRef, ArrowError> {
303        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
304        let length = field_node.length() as usize;
305        let child_data = child_array.into_data();
306        let builder = match data_type {
307            List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
308                .len(length)
309                .add_buffer(buffers[1].clone())
310                .add_child_data(child_data)
311                .null_bit_buffer(null_buffer),
312
313            FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
314                .len(length)
315                .add_child_data(child_data)
316                .null_bit_buffer(null_buffer),
317
318            _ => unreachable!("Cannot create list or map array from {:?}", data_type),
319        };
320
321        let array_data = builder.align_buffers(!self.require_alignment).build()?;
322
323        Ok(make_array(array_data))
324    }
325
326    /// Reads the correct number of buffers based on list type and null_count, and creates a
327    /// list array ref
328    fn create_dictionary_array(
329        &self,
330        field_node: &FieldNode,
331        data_type: &DataType,
332        buffers: &[Buffer],
333        value_array: ArrayRef,
334    ) -> Result<ArrayRef, ArrowError> {
335        if let Dictionary(_, _) = *data_type {
336            let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
337            let array_data = ArrayData::builder(data_type.clone())
338                .len(field_node.length() as usize)
339                .add_buffer(buffers[1].clone())
340                .add_child_data(value_array.into_data())
341                .null_bit_buffer(null_buffer)
342                .align_buffers(!self.require_alignment)
343                .build()?;
344
345            Ok(make_array(array_data))
346        } else {
347            unreachable!("Cannot create dictionary array from {:?}", data_type)
348        }
349    }
350}
351
352/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
353/// [`RecordBatch`]
354///
355/// [IPC RecordBatch]: crate::RecordBatch
356struct RecordBatchDecoder<'a> {
357    /// The flatbuffers encoded record batch
358    batch: crate::RecordBatch<'a>,
359    /// The output schema
360    schema: SchemaRef,
361    /// Decoded dictionaries indexed by dictionary id
362    dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
363    /// Optional compression codec
364    compression: Option<CompressionCodec>,
365    /// The format version
366    version: MetadataVersion,
367    /// The raw data buffer
368    data: &'a Buffer,
369    /// The fields comprising this array
370    nodes: VectorIter<'a, FieldNode>,
371    /// The buffers comprising this array
372    buffers: VectorIter<'a, crate::Buffer>,
373    /// Projection (subset of columns) to read, if any
374    /// See [`RecordBatchDecoder::with_projection`] for details
375    projection: Option<&'a [usize]>,
376    /// Are buffers required to already be aligned? See
377    /// [`RecordBatchDecoder::with_require_alignment`] for details
378    require_alignment: bool,
379}
380
381impl<'a> RecordBatchDecoder<'a> {
382    /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
383    fn try_new(
384        buf: &'a Buffer,
385        batch: crate::RecordBatch<'a>,
386        schema: SchemaRef,
387        dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
388        metadata: &'a MetadataVersion,
389    ) -> Result<Self, ArrowError> {
390        let buffers = batch.buffers().ok_or_else(|| {
391            ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
392        })?;
393        let field_nodes = batch.nodes().ok_or_else(|| {
394            ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
395        })?;
396
397        let batch_compression = batch.compression();
398        let compression = batch_compression
399            .map(|batch_compression| batch_compression.codec().try_into())
400            .transpose()?;
401
402        Ok(Self {
403            batch,
404            schema,
405            dictionaries_by_id,
406            compression,
407            version: *metadata,
408            data: buf,
409            nodes: field_nodes.iter(),
410            buffers: buffers.iter(),
411            projection: None,
412            require_alignment: false,
413        })
414    }
415
416    /// Set the projection (default: None)
417    ///
418    /// If set, the projection is the list  of column indices
419    /// that will be read
420    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
421        self.projection = projection;
422        self
423    }
424
425    /// Set require_alignment (default: false)
426    ///
427    /// If true, buffers must be aligned appropriately or error will
428    /// result. If false, buffers will be copied to aligned buffers
429    /// if necessary.
430    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
431        self.require_alignment = require_alignment;
432        self
433    }
434
435    /// Read the record batch, consuming the reader
436    fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
437        let mut variadic_counts: VecDeque<i64> = self
438            .batch
439            .variadicBufferCounts()
440            .into_iter()
441            .flatten()
442            .collect();
443
444        let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
445
446        let schema = Arc::clone(&self.schema);
447        if let Some(projection) = self.projection {
448            let mut arrays = vec![];
449            // project fields
450            for (idx, field) in schema.fields().iter().enumerate() {
451                // Create array for projected field
452                if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
453                    let child = self.create_array(field, &mut variadic_counts)?;
454                    arrays.push((proj_idx, child));
455                } else {
456                    self.skip_field(field, &mut variadic_counts)?;
457                }
458            }
459            assert!(variadic_counts.is_empty());
460            arrays.sort_by_key(|t| t.0);
461            RecordBatch::try_new_with_options(
462                Arc::new(schema.project(projection)?),
463                arrays.into_iter().map(|t| t.1).collect(),
464                &options,
465            )
466        } else {
467            let mut children = vec![];
468            // keep track of index as lists require more than one node
469            for field in schema.fields() {
470                let child = self.create_array(field, &mut variadic_counts)?;
471                children.push(child);
472            }
473            assert!(variadic_counts.is_empty());
474            RecordBatch::try_new_with_options(schema, children, &options)
475        }
476    }
477
478    fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
479        read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
480    }
481
482    fn skip_buffer(&mut self) {
483        self.buffers.next().unwrap();
484    }
485
486    fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
487        self.nodes.next().ok_or_else(|| {
488            ArrowError::SchemaError(format!(
489                "Invalid data for schema. {} refers to node not found in schema",
490                field
491            ))
492        })
493    }
494
495    fn skip_field(
496        &mut self,
497        field: &Field,
498        variadic_count: &mut VecDeque<i64>,
499    ) -> Result<(), ArrowError> {
500        self.next_node(field)?;
501
502        match field.data_type() {
503            Utf8 | Binary | LargeBinary | LargeUtf8 => {
504                for _ in 0..3 {
505                    self.skip_buffer()
506                }
507            }
508            Utf8View | BinaryView => {
509                let count = variadic_count
510                    .pop_front()
511                    .ok_or(ArrowError::IpcError(format!(
512                        "Missing variadic count for {} column",
513                        field.data_type()
514                    )))?;
515                let count = count + 2; // view and null buffer.
516                for _i in 0..count {
517                    self.skip_buffer()
518                }
519            }
520            FixedSizeBinary(_) => {
521                self.skip_buffer();
522                self.skip_buffer();
523            }
524            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
525                self.skip_buffer();
526                self.skip_buffer();
527                self.skip_field(list_field, variadic_count)?;
528            }
529            FixedSizeList(list_field, _) => {
530                self.skip_buffer();
531                self.skip_field(list_field, variadic_count)?;
532            }
533            Struct(struct_fields) => {
534                self.skip_buffer();
535
536                // skip for each field
537                for struct_field in struct_fields {
538                    self.skip_field(struct_field, variadic_count)?
539                }
540            }
541            RunEndEncoded(run_ends_field, values_field) => {
542                self.skip_field(run_ends_field, variadic_count)?;
543                self.skip_field(values_field, variadic_count)?;
544            }
545            Dictionary(_, _) => {
546                self.skip_buffer(); // Nulls
547                self.skip_buffer(); // Indices
548            }
549            Union(fields, mode) => {
550                self.skip_buffer(); // Nulls
551
552                match mode {
553                    UnionMode::Dense => self.skip_buffer(),
554                    UnionMode::Sparse => {}
555                };
556
557                for (_, field) in fields.iter() {
558                    self.skip_field(field, variadic_count)?
559                }
560            }
561            Null => {} // No buffer increases
562            _ => {
563                self.skip_buffer();
564                self.skip_buffer();
565            }
566        };
567        Ok(())
568    }
569}
570
571/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
572///
573/// If `require_alignment` is true, this function will return an error if any array data in the
574/// input `buf` is not properly aligned.
575/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct [`arrow_data::ArrayData`].
576///
577/// If `require_alignment` is false, this function will automatically allocate a new aligned buffer
578/// and copy over the data if any array data in the input `buf` is not properly aligned.
579/// (Properly aligned array data will remain zero-copy.)
580/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
581pub fn read_record_batch(
582    buf: &Buffer,
583    batch: crate::RecordBatch,
584    schema: SchemaRef,
585    dictionaries_by_id: &HashMap<i64, ArrayRef>,
586    projection: Option<&[usize]>,
587    metadata: &MetadataVersion,
588) -> Result<RecordBatch, ArrowError> {
589    RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
590        .with_projection(projection)
591        .with_require_alignment(false)
592        .read_record_batch()
593}
594
595/// Read the dictionary from the buffer and provided metadata,
596/// updating the `dictionaries_by_id` with the resulting dictionary
597pub fn read_dictionary(
598    buf: &Buffer,
599    batch: crate::DictionaryBatch,
600    schema: &Schema,
601    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
602    metadata: &MetadataVersion,
603) -> Result<(), ArrowError> {
604    read_dictionary_impl(buf, batch, schema, dictionaries_by_id, metadata, false)
605}
606
607fn read_dictionary_impl(
608    buf: &Buffer,
609    batch: crate::DictionaryBatch,
610    schema: &Schema,
611    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
612    metadata: &MetadataVersion,
613    require_alignment: bool,
614) -> Result<(), ArrowError> {
615    if batch.isDelta() {
616        return Err(ArrowError::InvalidArgumentError(
617            "delta dictionary batches not supported".to_string(),
618        ));
619    }
620
621    let id = batch.id();
622    #[allow(deprecated)]
623    let fields_using_this_dictionary = schema.fields_with_dict_id(id);
624    let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
625        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
626    })?;
627
628    // As the dictionary batch does not contain the type of the
629    // values array, we need to retrieve this from the schema.
630    // Get an array representing this dictionary's values.
631    let dictionary_values: ArrayRef = match first_field.data_type() {
632        DataType::Dictionary(_, ref value_type) => {
633            // Make a fake schema for the dictionary batch.
634            let value = value_type.as_ref().clone();
635            let schema = Schema::new(vec![Field::new("", value, true)]);
636            // Read a single column
637            let record_batch = RecordBatchDecoder::try_new(
638                buf,
639                batch.data().unwrap(),
640                Arc::new(schema),
641                dictionaries_by_id,
642                metadata,
643            )?
644            .with_require_alignment(require_alignment)
645            .read_record_batch()?;
646
647            Some(record_batch.column(0).clone())
648        }
649        _ => None,
650    }
651    .ok_or_else(|| {
652        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
653    })?;
654
655    // We don't currently record the isOrdered field. This could be general
656    // attributes of arrays.
657    // Add (possibly multiple) array refs to the dictionaries array.
658    dictionaries_by_id.insert(id, dictionary_values.clone());
659
660    Ok(())
661}
662
663/// Read the data for a given block
664fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
665    reader.seek(SeekFrom::Start(block.offset() as u64))?;
666    let body_len = block.bodyLength().to_usize().unwrap();
667    let metadata_len = block.metaDataLength().to_usize().unwrap();
668    let total_len = body_len.checked_add(metadata_len).unwrap();
669
670    let mut buf = MutableBuffer::from_len_zeroed(total_len);
671    reader.read_exact(&mut buf)?;
672    Ok(buf.into())
673}
674
675/// Parse an encapsulated message
676///
677/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
678fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
679    let buf = match buf[..4] == CONTINUATION_MARKER {
680        true => &buf[8..],
681        false => &buf[4..],
682    };
683    crate::root_as_message(buf)
684        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
685}
686
687/// Read the footer length from the last 10 bytes of an Arrow IPC file
688///
689/// Expects a 4 byte footer length followed by `b"ARROW1"`
690pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
691    if buf[4..] != super::ARROW_MAGIC {
692        return Err(ArrowError::ParseError(
693            "Arrow file does not contain correct footer".to_string(),
694        ));
695    }
696
697    // read footer length
698    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
699    footer_len
700        .try_into()
701        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
702}
703
704/// A low-level, push-based interface for reading an IPC file
705///
706/// For a higher-level interface see [`FileReader`]
707///
708/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] example.
709///
710/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
711///
712/// ```
713/// # use std::sync::Arc;
714/// # use arrow_array::*;
715/// # use arrow_array::types::Int32Type;
716/// # use arrow_buffer::Buffer;
717/// # use arrow_ipc::convert::fb_to_schema;
718/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
719/// # use arrow_ipc::root_as_footer;
720/// # use arrow_ipc::writer::FileWriter;
721/// // Write an IPC file
722///
723/// let batch = RecordBatch::try_from_iter([
724///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
725///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
726///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", "hello", "world"])) as _),
727/// ]).unwrap();
728///
729/// let schema = batch.schema();
730///
731/// let mut out = Vec::with_capacity(1024);
732/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
733/// writer.write(&batch).unwrap();
734/// writer.finish().unwrap();
735///
736/// drop(writer);
737///
738/// // Read IPC file
739///
740/// let buffer = Buffer::from_vec(out);
741/// let trailer_start = buffer.len() - 10;
742/// let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
743/// let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
744///
745/// let back = fb_to_schema(footer.schema().unwrap());
746/// assert_eq!(&back, schema.as_ref());
747///
748/// let mut decoder = FileDecoder::new(schema, footer.version());
749///
750/// // Read dictionaries
751/// for block in footer.dictionaries().iter().flatten() {
752///     let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
753///     let data = buffer.slice_with_length(block.offset() as _, block_len);
754///     decoder.read_dictionary(&block, &data).unwrap();
755/// }
756///
757/// // Read record batch
758/// let batches = footer.recordBatches().unwrap();
759/// assert_eq!(batches.len(), 1); // Only wrote a single batch
760///
761/// let block = batches.get(0);
762/// let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
763/// let data = buffer.slice_with_length(block.offset() as _, block_len);
764/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
765///
766/// assert_eq!(batch, back);
767/// ```
768#[derive(Debug)]
769pub struct FileDecoder {
770    schema: SchemaRef,
771    dictionaries: HashMap<i64, ArrayRef>,
772    version: MetadataVersion,
773    projection: Option<Vec<usize>>,
774    require_alignment: bool,
775}
776
777impl FileDecoder {
778    /// Create a new [`FileDecoder`] with the given schema and version
779    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
780        Self {
781            schema,
782            version,
783            dictionaries: Default::default(),
784            projection: None,
785            require_alignment: false,
786        }
787    }
788
789    /// Specify a projection
790    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
791        self.projection = Some(projection);
792        self
793    }
794
795    /// Specifies whether or not array data in input buffers is required to be properly aligned.
796    ///
797    /// If `require_alignment` is true, this decoder will return an error if any array data in the
798    /// input `buf` is not properly aligned.
799    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
800    /// [`arrow_data::ArrayData`].
801    ///
802    /// If `require_alignment` is false (the default), this decoder will automatically allocate a
803    /// new aligned buffer and copy over the data if any array data in the input `buf` is not
804    /// properly aligned. (Properly aligned array data will remain zero-copy.)
805    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
806    /// [`arrow_data::ArrayData`].
807    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
808        self.require_alignment = require_alignment;
809        self
810    }
811
812    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message<'a>, ArrowError> {
813        let message = parse_message(buf)?;
814
815        // some old test data's footer metadata is not set, so we account for that
816        if self.version != MetadataVersion::V1 && message.version() != self.version {
817            return Err(ArrowError::IpcError(
818                "Could not read IPC message as metadata versions mismatch".to_string(),
819            ));
820        }
821        Ok(message)
822    }
823
824    /// Read the dictionary with the given block and data buffer
825    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
826        let message = self.read_message(buf)?;
827        match message.header_type() {
828            crate::MessageHeader::DictionaryBatch => {
829                let batch = message.header_as_dictionary_batch().unwrap();
830                read_dictionary_impl(
831                    &buf.slice(block.metaDataLength() as _),
832                    batch,
833                    &self.schema,
834                    &mut self.dictionaries,
835                    &message.version(),
836                    self.require_alignment,
837                )
838            }
839            t => Err(ArrowError::ParseError(format!(
840                "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
841            ))),
842        }
843    }
844
845    /// Read the RecordBatch with the given block and data buffer
846    pub fn read_record_batch(
847        &self,
848        block: &Block,
849        buf: &Buffer,
850    ) -> Result<Option<RecordBatch>, ArrowError> {
851        let message = self.read_message(buf)?;
852        match message.header_type() {
853            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
854                "Not expecting a schema when messages are read".to_string(),
855            )),
856            crate::MessageHeader::RecordBatch => {
857                let batch = message.header_as_record_batch().ok_or_else(|| {
858                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
859                })?;
860                // read the block that makes up the record batch into a buffer
861                RecordBatchDecoder::try_new(
862                    &buf.slice(block.metaDataLength() as _),
863                    batch,
864                    self.schema.clone(),
865                    &self.dictionaries,
866                    &message.version(),
867                )?
868                .with_projection(self.projection.as_deref())
869                .with_require_alignment(self.require_alignment)
870                .read_record_batch()
871                .map(Some)
872            }
873            crate::MessageHeader::NONE => Ok(None),
874            t => Err(ArrowError::InvalidArgumentError(format!(
875                "Reading types other than record batches not yet supported, unable to read {t:?}"
876            ))),
877        }
878    }
879}
880
881/// Build an Arrow [`FileReader`] with custom options.
882#[derive(Debug)]
883pub struct FileReaderBuilder {
884    /// Optional projection for which columns to load (zero-based column indices)
885    projection: Option<Vec<usize>>,
886    /// Passed through to construct [`VerifierOptions`]
887    max_footer_fb_tables: usize,
888    /// Passed through to construct [`VerifierOptions`]
889    max_footer_fb_depth: usize,
890}
891
892impl Default for FileReaderBuilder {
893    fn default() -> Self {
894        let verifier_options = VerifierOptions::default();
895        Self {
896            max_footer_fb_tables: verifier_options.max_tables,
897            max_footer_fb_depth: verifier_options.max_depth,
898            projection: None,
899        }
900    }
901}
902
903impl FileReaderBuilder {
904    /// Options for creating a new [`FileReader`].
905    ///
906    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
907    pub fn new() -> Self {
908        Self::default()
909    }
910
911    /// Optional projection for which columns to load (zero-based column indices).
912    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
913        self.projection = Some(projection);
914        self
915    }
916
917    /// Flatbuffers option for parsing the footer. Controls the max number of fields and
918    /// metadata key-value pairs that can be parsed from the schema of the footer.
919    ///
920    /// By default this is set to `1_000_000` which roughly translates to a schema with
921    /// no metadata key-value pairs but 499,999 fields.
922    ///
923    /// This default limit is enforced to protect against malicious files with a massive
924    /// amount of flatbuffer tables which could cause a denial of service attack.
925    ///
926    /// If you need to ingest a trusted file with a massive number of fields and/or
927    /// metadata key-value pairs and are facing the error `"Unable to get root as
928    /// footer: TooManyTables"` then increase this parameter as necessary.
929    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
930        self.max_footer_fb_tables = max_footer_fb_tables;
931        self
932    }
933
934    /// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
935    /// nested fields parsed from the footer.
936    ///
937    /// By default this is set to `64` which roughly translates to a schema with
938    /// a field nested 60 levels down through other struct fields.
939    ///
940    /// This default limit is enforced to protect against malicious files with a extremely
941    /// deep flatbuffer structure which could cause a denial of service attack.
942    ///
943    /// If you need to ingest a trusted file with a deeply nested field and are facing the
944    /// error `"Unable to get root as footer: DepthLimitReached"` then increase this
945    /// parameter as necessary.
946    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
947        self.max_footer_fb_depth = max_footer_fb_depth;
948        self
949    }
950
951    /// Build [`FileReader`] with given reader.
952    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
953        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
954        let mut buffer = [0; 10];
955        reader.seek(SeekFrom::End(-10))?;
956        reader.read_exact(&mut buffer)?;
957
958        let footer_len = read_footer_length(buffer)?;
959
960        // read footer
961        let mut footer_data = vec![0; footer_len];
962        reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
963        reader.read_exact(&mut footer_data)?;
964
965        let verifier_options = VerifierOptions {
966            max_tables: self.max_footer_fb_tables,
967            max_depth: self.max_footer_fb_depth,
968            ..Default::default()
969        };
970        let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
971            |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
972        )?;
973
974        let blocks = footer.recordBatches().ok_or_else(|| {
975            ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
976        })?;
977
978        let total_blocks = blocks.len();
979
980        let ipc_schema = footer.schema().unwrap();
981        if !ipc_schema.endianness().equals_to_target_endianness() {
982            return Err(ArrowError::IpcError(
983                "the endianness of the source system does not match the endianness of the target system.".to_owned()
984            ));
985        }
986
987        let schema = crate::convert::fb_to_schema(ipc_schema);
988
989        let mut custom_metadata = HashMap::new();
990        if let Some(fb_custom_metadata) = footer.custom_metadata() {
991            for kv in fb_custom_metadata.into_iter() {
992                custom_metadata.insert(
993                    kv.key().unwrap().to_string(),
994                    kv.value().unwrap().to_string(),
995                );
996            }
997        }
998
999        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1000        if let Some(projection) = self.projection {
1001            decoder = decoder.with_projection(projection)
1002        }
1003
1004        // Create an array of optional dictionary value arrays, one per field.
1005        if let Some(dictionaries) = footer.dictionaries() {
1006            for block in dictionaries {
1007                let buf = read_block(&mut reader, block)?;
1008                decoder.read_dictionary(block, &buf)?;
1009            }
1010        }
1011
1012        Ok(FileReader {
1013            reader,
1014            blocks: blocks.iter().copied().collect(),
1015            current_block: 0,
1016            total_blocks,
1017            decoder,
1018            custom_metadata,
1019        })
1020    }
1021}
1022
1023/// Arrow File Reader
1024///
1025/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC File Format],
1026/// providing random access to the record batches.
1027///
1028/// # See Also
1029///
1030/// * [`Self::set_index`] for random access
1031/// * [`StreamReader`] for reading streaming data
1032///
1033/// # Example: Reading from a `File`
1034/// ```
1035/// # use std::io::Cursor;
1036/// use arrow_array::record_batch;
1037/// # use arrow_ipc::reader::FileReader;
1038/// # use arrow_ipc::writer::FileWriter;
1039/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1040/// # let mut file = vec![]; // mimic a stream for the example
1041/// # {
1042/// #  let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1043/// #  writer.write(&batch).unwrap();
1044/// #  writer.write(&batch).unwrap();
1045/// #  writer.finish().unwrap();
1046/// # }
1047/// # let mut file = Cursor::new(&file);
1048/// let projection = None; // read all columns
1049/// let mut reader = FileReader::try_new(&mut file, projection).unwrap();
1050/// // Position the reader to the second batch
1051/// reader.set_index(1).unwrap();
1052/// // read batches from the reader using the Iterator trait
1053/// let mut num_rows = 0;
1054/// for batch in reader {
1055///    let batch = batch.unwrap();
1056///    num_rows += batch.num_rows();
1057/// }
1058/// assert_eq!(num_rows, 3);
1059/// ```
1060/// # Example: Reading from `mmap`ed file
1061///
1062/// For an example creating Arrays without copying using  memory mapped (`mmap`)
1063/// files see the [`zero_copy_ipc`] example.
1064///
1065/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1066/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
1067pub struct FileReader<R> {
1068    /// File reader that supports reading and seeking
1069    reader: R,
1070
1071    /// The decoder
1072    decoder: FileDecoder,
1073
1074    /// The blocks in the file
1075    ///
1076    /// A block indicates the regions in the file to read to get data
1077    blocks: Vec<Block>,
1078
1079    /// A counter to keep track of the current block that should be read
1080    current_block: usize,
1081
1082    /// The total number of blocks, which may contain record batches and other types
1083    total_blocks: usize,
1084
1085    /// User defined metadata
1086    custom_metadata: HashMap<String, String>,
1087}
1088
1089impl<R> fmt::Debug for FileReader<R> {
1090    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1091        f.debug_struct("FileReader<R>")
1092            .field("decoder", &self.decoder)
1093            .field("blocks", &self.blocks)
1094            .field("current_block", &self.current_block)
1095            .field("total_blocks", &self.total_blocks)
1096            .finish_non_exhaustive()
1097    }
1098}
1099
1100impl<R: Read + Seek> FileReader<BufReader<R>> {
1101    /// Try to create a new file reader with the reader wrapped in a BufReader.
1102    ///
1103    /// See [`FileReader::try_new`] for an unbuffered version.
1104    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1105        Self::try_new(BufReader::new(reader), projection)
1106    }
1107}
1108
1109impl<R: Read + Seek> FileReader<R> {
1110    /// Try to create a new file reader.
1111    ///
1112    /// There is no internal buffering. If buffered reads are needed you likely want to use
1113    /// [`FileReader::try_new_buffered`] instead.    
1114    ///
1115    /// # Errors
1116    ///
1117    /// An ['Err'](Result::Err) may be returned if:
1118    /// - the file does not meet the Arrow Format footer requirements, or
1119    /// - file endianness does not match the target endianness.
1120    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1121        let builder = FileReaderBuilder {
1122            projection,
1123            ..Default::default()
1124        };
1125        builder.build(reader)
1126    }
1127
1128    /// Return user defined customized metadata
1129    pub fn custom_metadata(&self) -> &HashMap<String, String> {
1130        &self.custom_metadata
1131    }
1132
1133    /// Return the number of batches in the file
1134    pub fn num_batches(&self) -> usize {
1135        self.total_blocks
1136    }
1137
1138    /// Return the schema of the file
1139    pub fn schema(&self) -> SchemaRef {
1140        self.decoder.schema.clone()
1141    }
1142
1143    /// See to a specific [`RecordBatch`]
1144    ///
1145    /// Sets the current block to the index, allowing random reads
1146    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1147        if index >= self.total_blocks {
1148            Err(ArrowError::InvalidArgumentError(format!(
1149                "Cannot set batch to index {} from {} total batches",
1150                index, self.total_blocks
1151            )))
1152        } else {
1153            self.current_block = index;
1154            Ok(())
1155        }
1156    }
1157
1158    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1159        let block = &self.blocks[self.current_block];
1160        self.current_block += 1;
1161
1162        // read length
1163        let buffer = read_block(&mut self.reader, block)?;
1164        self.decoder.read_record_batch(block, &buffer)
1165    }
1166
1167    /// Gets a reference to the underlying reader.
1168    ///
1169    /// It is inadvisable to directly read from the underlying reader.
1170    pub fn get_ref(&self) -> &R {
1171        &self.reader
1172    }
1173
1174    /// Gets a mutable reference to the underlying reader.
1175    ///
1176    /// It is inadvisable to directly read from the underlying reader.
1177    pub fn get_mut(&mut self) -> &mut R {
1178        &mut self.reader
1179    }
1180}
1181
1182impl<R: Read + Seek> Iterator for FileReader<R> {
1183    type Item = Result<RecordBatch, ArrowError>;
1184
1185    fn next(&mut self) -> Option<Self::Item> {
1186        // get current block
1187        if self.current_block < self.total_blocks {
1188            self.maybe_next().transpose()
1189        } else {
1190            None
1191        }
1192    }
1193}
1194
1195impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1196    fn schema(&self) -> SchemaRef {
1197        self.schema()
1198    }
1199}
1200
1201/// Arrow Stream Reader
1202///
1203/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC Streaming Format].
1204///
1205/// # See Also
1206///
1207/// * [`FileReader`] for random access.
1208///
1209/// # Example
1210/// ```
1211/// # use arrow_array::record_batch;
1212/// # use arrow_ipc::reader::StreamReader;
1213/// # use arrow_ipc::writer::StreamWriter;
1214/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1215/// # let mut stream = vec![]; // mimic a stream for the example
1216/// # {
1217/// #  let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1218/// #  writer.write(&batch).unwrap();
1219/// #  writer.finish().unwrap();
1220/// # }
1221/// # let stream = stream.as_slice();
1222/// let projection = None; // read all columns
1223/// let mut reader = StreamReader::try_new(stream, projection).unwrap();
1224/// // read batches from the reader using the Iterator trait
1225/// let mut num_rows = 0;
1226/// for batch in reader {
1227///    let batch = batch.unwrap();
1228///    num_rows += batch.num_rows();
1229/// }
1230/// assert_eq!(num_rows, 3);
1231/// ```
1232///
1233/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1234pub struct StreamReader<R> {
1235    /// Stream reader
1236    reader: R,
1237
1238    /// The schema that is read from the stream's first message
1239    schema: SchemaRef,
1240
1241    /// Optional dictionaries for each schema field.
1242    ///
1243    /// Dictionaries may be appended to in the streaming format.
1244    dictionaries_by_id: HashMap<i64, ArrayRef>,
1245
1246    /// An indicator of whether the stream is complete.
1247    ///
1248    /// This value is set to `true` the first time the reader's `next()` returns `None`.
1249    finished: bool,
1250
1251    /// Optional projection
1252    projection: Option<(Vec<usize>, Schema)>,
1253}
1254
1255impl<R> fmt::Debug for StreamReader<R> {
1256    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1257        f.debug_struct("StreamReader<R>")
1258            .field("reader", &"R")
1259            .field("schema", &self.schema)
1260            .field("dictionaries_by_id", &self.dictionaries_by_id)
1261            .field("finished", &self.finished)
1262            .field("projection", &self.projection)
1263            .finish()
1264    }
1265}
1266
1267impl<R: Read> StreamReader<BufReader<R>> {
1268    /// Try to create a new stream reader with the reader wrapped in a BufReader.
1269    ///
1270    /// See [`StreamReader::try_new`] for an unbuffered version.
1271    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1272        Self::try_new(BufReader::new(reader), projection)
1273    }
1274}
1275
1276impl<R: Read> StreamReader<R> {
1277    /// Try to create a new stream reader.
1278    ///
1279    /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
1280    ///
1281    /// There is no internal buffering. If buffered reads are needed you likely want to use
1282    /// [`StreamReader::try_new_buffered`] instead.
1283    ///
1284    /// # Errors
1285    ///
1286    /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
1287    /// as the first message in the stream.
1288    pub fn try_new(
1289        mut reader: R,
1290        projection: Option<Vec<usize>>,
1291    ) -> Result<StreamReader<R>, ArrowError> {
1292        // determine metadata length
1293        let mut meta_size: [u8; 4] = [0; 4];
1294        reader.read_exact(&mut meta_size)?;
1295        let meta_len = {
1296            // If a continuation marker is encountered, skip over it and read
1297            // the size from the next four bytes.
1298            if meta_size == CONTINUATION_MARKER {
1299                reader.read_exact(&mut meta_size)?;
1300            }
1301            i32::from_le_bytes(meta_size)
1302        };
1303
1304        let mut meta_buffer = vec![0; meta_len as usize];
1305        reader.read_exact(&mut meta_buffer)?;
1306
1307        let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
1308            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1309        })?;
1310        // message header is a Schema, so read it
1311        let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
1312            ArrowError::ParseError("Unable to read IPC message as schema".to_string())
1313        })?;
1314        let schema = crate::convert::fb_to_schema(ipc_schema);
1315
1316        // Create an array of optional dictionary value arrays, one per field.
1317        let dictionaries_by_id = HashMap::new();
1318
1319        let projection = match projection {
1320            Some(projection_indices) => {
1321                let schema = schema.project(&projection_indices)?;
1322                Some((projection_indices, schema))
1323            }
1324            _ => None,
1325        };
1326        Ok(Self {
1327            reader,
1328            schema: Arc::new(schema),
1329            finished: false,
1330            dictionaries_by_id,
1331            projection,
1332        })
1333    }
1334
1335    /// Deprecated, use [`StreamReader::try_new`] instead.
1336    #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1337    pub fn try_new_unbuffered(
1338        reader: R,
1339        projection: Option<Vec<usize>>,
1340    ) -> Result<Self, ArrowError> {
1341        Self::try_new(reader, projection)
1342    }
1343
1344    /// Return the schema of the stream
1345    pub fn schema(&self) -> SchemaRef {
1346        self.schema.clone()
1347    }
1348
1349    /// Check if the stream is finished
1350    pub fn is_finished(&self) -> bool {
1351        self.finished
1352    }
1353
1354    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1355        if self.finished {
1356            return Ok(None);
1357        }
1358        // determine metadata length
1359        let mut meta_size: [u8; 4] = [0; 4];
1360
1361        match self.reader.read_exact(&mut meta_size) {
1362            Ok(()) => (),
1363            Err(e) => {
1364                return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1365                    // Handle EOF without the "0xFFFFFFFF 0x00000000"
1366                    // valid according to:
1367                    // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1368                    self.finished = true;
1369                    Ok(None)
1370                } else {
1371                    Err(ArrowError::from(e))
1372                };
1373            }
1374        }
1375
1376        let meta_len = {
1377            // If a continuation marker is encountered, skip over it and read
1378            // the size from the next four bytes.
1379            if meta_size == CONTINUATION_MARKER {
1380                self.reader.read_exact(&mut meta_size)?;
1381            }
1382            i32::from_le_bytes(meta_size)
1383        };
1384
1385        if meta_len == 0 {
1386            // the stream has ended, mark the reader as finished
1387            self.finished = true;
1388            return Ok(None);
1389        }
1390
1391        let mut meta_buffer = vec![0; meta_len as usize];
1392        self.reader.read_exact(&mut meta_buffer)?;
1393
1394        let vecs = &meta_buffer.to_vec();
1395        let message = crate::root_as_message(vecs).map_err(|err| {
1396            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1397        })?;
1398
1399        match message.header_type() {
1400            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1401                "Not expecting a schema when messages are read".to_string(),
1402            )),
1403            crate::MessageHeader::RecordBatch => {
1404                let batch = message.header_as_record_batch().ok_or_else(|| {
1405                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1406                })?;
1407                // read the block that makes up the record batch into a buffer
1408                let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1409                self.reader.read_exact(&mut buf)?;
1410
1411                RecordBatchDecoder::try_new(
1412                    &buf.into(),
1413                    batch,
1414                    self.schema(),
1415                    &self.dictionaries_by_id,
1416                    &message.version(),
1417                )?
1418                .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1419                .with_require_alignment(false)
1420                .read_record_batch()
1421                .map(Some)
1422            }
1423            crate::MessageHeader::DictionaryBatch => {
1424                let batch = message.header_as_dictionary_batch().ok_or_else(|| {
1425                    ArrowError::IpcError(
1426                        "Unable to read IPC message as dictionary batch".to_string(),
1427                    )
1428                })?;
1429                // read the block that makes up the dictionary batch into a buffer
1430                let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1431                self.reader.read_exact(&mut buf)?;
1432
1433                read_dictionary_impl(
1434                    &buf.into(),
1435                    batch,
1436                    &self.schema,
1437                    &mut self.dictionaries_by_id,
1438                    &message.version(),
1439                    false,
1440                )?;
1441
1442                // read the next message until we encounter a RecordBatch
1443                self.maybe_next()
1444            }
1445            crate::MessageHeader::NONE => Ok(None),
1446            t => Err(ArrowError::InvalidArgumentError(format!(
1447                "Reading types other than record batches not yet supported, unable to read {t:?} "
1448            ))),
1449        }
1450    }
1451
1452    /// Gets a reference to the underlying reader.
1453    ///
1454    /// It is inadvisable to directly read from the underlying reader.
1455    pub fn get_ref(&self) -> &R {
1456        &self.reader
1457    }
1458
1459    /// Gets a mutable reference to the underlying reader.
1460    ///
1461    /// It is inadvisable to directly read from the underlying reader.
1462    pub fn get_mut(&mut self) -> &mut R {
1463        &mut self.reader
1464    }
1465}
1466
1467impl<R: Read> Iterator for StreamReader<R> {
1468    type Item = Result<RecordBatch, ArrowError>;
1469
1470    fn next(&mut self) -> Option<Self::Item> {
1471        self.maybe_next().transpose()
1472    }
1473}
1474
1475impl<R: Read> RecordBatchReader for StreamReader<R> {
1476    fn schema(&self) -> SchemaRef {
1477        self.schema.clone()
1478    }
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483    use crate::writer::{unslice_run_array, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
1484
1485    use super::*;
1486
1487    use crate::convert::fb_to_schema;
1488    use crate::{root_as_footer, root_as_message};
1489    use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1490    use arrow_array::types::*;
1491    use arrow_buffer::{NullBuffer, OffsetBuffer};
1492    use arrow_data::ArrayDataBuilder;
1493
1494    fn create_test_projection_schema() -> Schema {
1495        // define field types
1496        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1497
1498        let fixed_size_list_data_type =
1499            DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1500
1501        let union_fields = UnionFields::new(
1502            vec![0, 1],
1503            vec![
1504                Field::new("a", DataType::Int32, false),
1505                Field::new("b", DataType::Float64, false),
1506            ],
1507        );
1508
1509        let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1510
1511        let struct_fields = Fields::from(vec![
1512            Field::new("id", DataType::Int32, false),
1513            Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1514        ]);
1515        let struct_data_type = DataType::Struct(struct_fields);
1516
1517        let run_encoded_data_type = DataType::RunEndEncoded(
1518            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1519            Arc::new(Field::new("values", DataType::Int32, true)),
1520        );
1521
1522        // define schema
1523        Schema::new(vec![
1524            Field::new("f0", DataType::UInt32, false),
1525            Field::new("f1", DataType::Utf8, false),
1526            Field::new("f2", DataType::Boolean, false),
1527            Field::new("f3", union_data_type, true),
1528            Field::new("f4", DataType::Null, true),
1529            Field::new("f5", DataType::Float64, true),
1530            Field::new("f6", list_data_type, false),
1531            Field::new("f7", DataType::FixedSizeBinary(3), true),
1532            Field::new("f8", fixed_size_list_data_type, false),
1533            Field::new("f9", struct_data_type, false),
1534            Field::new("f10", run_encoded_data_type, false),
1535            Field::new("f11", DataType::Boolean, false),
1536            Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1537            Field::new("f13", DataType::Utf8, false),
1538        ])
1539    }
1540
1541    fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1542        // set test data for each column
1543        let array0 = UInt32Array::from(vec![1, 2, 3]);
1544        let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1545        let array2 = BooleanArray::from(vec![true, false, true]);
1546
1547        let mut union_builder = UnionBuilder::new_dense();
1548        union_builder.append::<Int32Type>("a", 1).unwrap();
1549        union_builder.append::<Float64Type>("b", 10.1).unwrap();
1550        union_builder.append_null::<Float64Type>("b").unwrap();
1551        let array3 = union_builder.build().unwrap();
1552
1553        let array4 = NullArray::new(3);
1554        let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1555        let array6_values = vec![
1556            Some(vec![Some(10), Some(10), Some(10)]),
1557            Some(vec![Some(20), Some(20), Some(20)]),
1558            Some(vec![Some(30), Some(30)]),
1559        ];
1560        let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1561        let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1562        let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1563
1564        let array8_values = ArrayData::builder(DataType::Int32)
1565            .len(9)
1566            .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1567            .build()
1568            .unwrap();
1569        let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1570            .len(3)
1571            .add_child_data(array8_values)
1572            .build()
1573            .unwrap();
1574        let array8 = FixedSizeListArray::from(array8_data);
1575
1576        let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1577        let array9_list: ArrayRef =
1578            Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1579                Some(vec![Some(-10)]),
1580                Some(vec![Some(-20), Some(-20), Some(-20)]),
1581                Some(vec![Some(-30)]),
1582            ]));
1583        let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1584            .add_child_data(array9_id.into_data())
1585            .add_child_data(array9_list.into_data())
1586            .len(3)
1587            .build()
1588            .unwrap();
1589        let array9: ArrayRef = Arc::new(StructArray::from(array9));
1590
1591        let array10_input = vec![Some(1_i32), None, None];
1592        let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1593        array10_builder.extend(array10_input);
1594        let array10 = array10_builder.finish();
1595
1596        let array11 = BooleanArray::from(vec![false, false, true]);
1597
1598        let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1599        let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1600        let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1601
1602        let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1603
1604        // create record batch
1605        RecordBatch::try_new(
1606            Arc::new(schema.clone()),
1607            vec![
1608                Arc::new(array0),
1609                Arc::new(array1),
1610                Arc::new(array2),
1611                Arc::new(array3),
1612                Arc::new(array4),
1613                Arc::new(array5),
1614                Arc::new(array6),
1615                Arc::new(array7),
1616                Arc::new(array8),
1617                Arc::new(array9),
1618                Arc::new(array10),
1619                Arc::new(array11),
1620                Arc::new(array12),
1621                Arc::new(array13),
1622            ],
1623        )
1624        .unwrap()
1625    }
1626
1627    #[test]
1628    fn test_projection_array_values() {
1629        // define schema
1630        let schema = create_test_projection_schema();
1631
1632        // create record batch with test data
1633        let batch = create_test_projection_batch_data(&schema);
1634
1635        // write record batch in IPC format
1636        let mut buf = Vec::new();
1637        {
1638            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
1639            writer.write(&batch).unwrap();
1640            writer.finish().unwrap();
1641        }
1642
1643        // read record batch with projection
1644        for index in 0..12 {
1645            let projection = vec![index];
1646            let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
1647            let read_batch = reader.unwrap().next().unwrap().unwrap();
1648            let projected_column = read_batch.column(0);
1649            let expected_column = batch.column(index);
1650
1651            // check the projected column equals the expected column
1652            assert_eq!(projected_column.as_ref(), expected_column.as_ref());
1653        }
1654
1655        {
1656            // read record batch with reversed projection
1657            let reader =
1658                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
1659            let read_batch = reader.unwrap().next().unwrap().unwrap();
1660            let expected_batch = batch.project(&[3, 2, 1]).unwrap();
1661            assert_eq!(read_batch, expected_batch);
1662        }
1663    }
1664
1665    #[test]
1666    fn test_arrow_single_float_row() {
1667        let schema = Schema::new(vec![
1668            Field::new("a", DataType::Float32, false),
1669            Field::new("b", DataType::Float32, false),
1670            Field::new("c", DataType::Int32, false),
1671            Field::new("d", DataType::Int32, false),
1672        ]);
1673        let arrays = vec![
1674            Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
1675            Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
1676            Arc::new(Int32Array::from(vec![2])) as ArrayRef,
1677            Arc::new(Int32Array::from(vec![1])) as ArrayRef,
1678        ];
1679        let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
1680        // create stream writer
1681        let mut file = tempfile::tempfile().unwrap();
1682        let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
1683        stream_writer.write(&batch).unwrap();
1684        stream_writer.finish().unwrap();
1685
1686        drop(stream_writer);
1687
1688        file.rewind().unwrap();
1689
1690        // read stream back
1691        let reader = StreamReader::try_new(&mut file, None).unwrap();
1692
1693        reader.for_each(|batch| {
1694            let batch = batch.unwrap();
1695            assert!(
1696                batch
1697                    .column(0)
1698                    .as_any()
1699                    .downcast_ref::<Float32Array>()
1700                    .unwrap()
1701                    .value(0)
1702                    != 0.0
1703            );
1704            assert!(
1705                batch
1706                    .column(1)
1707                    .as_any()
1708                    .downcast_ref::<Float32Array>()
1709                    .unwrap()
1710                    .value(0)
1711                    != 0.0
1712            );
1713        });
1714
1715        file.rewind().unwrap();
1716
1717        // Read with projection
1718        let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
1719
1720        reader.for_each(|batch| {
1721            let batch = batch.unwrap();
1722            assert_eq!(batch.schema().fields().len(), 2);
1723            assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
1724            assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
1725        });
1726    }
1727
1728    /// Write the record batch to an in-memory buffer in IPC File format
1729    fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
1730        let mut buf = Vec::new();
1731        let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
1732        writer.write(rb).unwrap();
1733        writer.finish().unwrap();
1734        buf
1735    }
1736
1737    /// Return the first record batch read from the IPC File buffer
1738    fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
1739        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
1740        reader.next().unwrap()
1741    }
1742
1743    fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
1744        let buf = write_ipc(rb);
1745        read_ipc(&buf).unwrap()
1746    }
1747
1748    /// Return the first record batch read from the IPC File buffer
1749    /// using the FileDecoder API
1750    fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
1751        let buffer = Buffer::from_vec(buf);
1752        let trailer_start = buffer.len() - 10;
1753        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
1754        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
1755            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
1756
1757        let schema = fb_to_schema(footer.schema().unwrap());
1758
1759        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1760        // Read dictionaries
1761        for block in footer.dictionaries().iter().flatten() {
1762            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
1763            let data = buffer.slice_with_length(block.offset() as _, block_len);
1764            decoder.read_dictionary(block, &data)?
1765        }
1766
1767        // Read record batch
1768        let batches = footer.recordBatches().unwrap();
1769        assert_eq!(batches.len(), 1); // Only wrote a single batch
1770
1771        let block = batches.get(0);
1772        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
1773        let data = buffer.slice_with_length(block.offset() as _, block_len);
1774        Ok(decoder.read_record_batch(block, &data)?.unwrap())
1775    }
1776
1777    /// Write the record batch to an in-memory buffer in IPC Stream format
1778    fn write_stream(rb: &RecordBatch) -> Vec<u8> {
1779        let mut buf = Vec::new();
1780        let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
1781        writer.write(rb).unwrap();
1782        writer.finish().unwrap();
1783        buf
1784    }
1785
1786    /// Return the first record batch read from the IPC Stream buffer
1787    fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
1788        let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
1789        reader.next().unwrap()
1790    }
1791
1792    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
1793        let buf = write_stream(rb);
1794        read_stream(&buf).unwrap()
1795    }
1796
1797    #[test]
1798    fn test_roundtrip_with_custom_metadata() {
1799        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
1800        let mut buf = Vec::new();
1801        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
1802        let mut test_metadata = HashMap::new();
1803        test_metadata.insert("abc".to_string(), "abc".to_string());
1804        test_metadata.insert("def".to_string(), "def".to_string());
1805        for (k, v) in &test_metadata {
1806            writer.write_metadata(k, v);
1807        }
1808        writer.finish().unwrap();
1809        drop(writer);
1810
1811        let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1812        assert_eq!(reader.custom_metadata(), &test_metadata);
1813    }
1814
1815    #[test]
1816    fn test_roundtrip_nested_dict() {
1817        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
1818
1819        let array = Arc::new(inner) as ArrayRef;
1820
1821        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
1822
1823        let s = StructArray::from(vec![(dctfield, array)]);
1824        let struct_array = Arc::new(s) as ArrayRef;
1825
1826        let schema = Arc::new(Schema::new(vec![Field::new(
1827            "struct",
1828            struct_array.data_type().clone(),
1829            false,
1830        )]));
1831
1832        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
1833
1834        assert_eq!(batch, roundtrip_ipc(&batch));
1835    }
1836
1837    #[test]
1838    fn test_roundtrip_nested_dict_no_preserve_dict_id() {
1839        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
1840
1841        let array = Arc::new(inner) as ArrayRef;
1842
1843        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
1844
1845        let s = StructArray::from(vec![(dctfield, array)]);
1846        let struct_array = Arc::new(s) as ArrayRef;
1847
1848        let schema = Arc::new(Schema::new(vec![Field::new(
1849            "struct",
1850            struct_array.data_type().clone(),
1851            false,
1852        )]));
1853
1854        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
1855
1856        let mut buf = Vec::new();
1857        let mut writer = crate::writer::FileWriter::try_new_with_options(
1858            &mut buf,
1859            batch.schema_ref(),
1860            #[allow(deprecated)]
1861            IpcWriteOptions::default().with_preserve_dict_id(false),
1862        )
1863        .unwrap();
1864        writer.write(&batch).unwrap();
1865        writer.finish().unwrap();
1866        drop(writer);
1867
1868        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1869
1870        assert_eq!(batch, reader.next().unwrap().unwrap());
1871    }
1872
1873    fn check_union_with_builder(mut builder: UnionBuilder) {
1874        builder.append::<Int32Type>("a", 1).unwrap();
1875        builder.append_null::<Int32Type>("a").unwrap();
1876        builder.append::<Float64Type>("c", 3.0).unwrap();
1877        builder.append::<Int32Type>("a", 4).unwrap();
1878        builder.append::<Int64Type>("d", 11).unwrap();
1879        let union = builder.build().unwrap();
1880
1881        let schema = Arc::new(Schema::new(vec![Field::new(
1882            "union",
1883            union.data_type().clone(),
1884            false,
1885        )]));
1886
1887        let union_array = Arc::new(union) as ArrayRef;
1888
1889        let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
1890        let rb2 = roundtrip_ipc(&rb);
1891        // TODO: equality not yet implemented for union, so we check that the length of the array is
1892        // the same and that all of the buffers are the same instead.
1893        assert_eq!(rb.schema(), rb2.schema());
1894        assert_eq!(rb.num_columns(), rb2.num_columns());
1895        assert_eq!(rb.num_rows(), rb2.num_rows());
1896        let union1 = rb.column(0);
1897        let union2 = rb2.column(0);
1898
1899        assert_eq!(union1, union2);
1900    }
1901
1902    #[test]
1903    fn test_roundtrip_dense_union() {
1904        check_union_with_builder(UnionBuilder::new_dense());
1905    }
1906
1907    #[test]
1908    fn test_roundtrip_sparse_union() {
1909        check_union_with_builder(UnionBuilder::new_sparse());
1910    }
1911
1912    #[test]
1913    fn test_roundtrip_struct_empty_fields() {
1914        let nulls = NullBuffer::from(&[true, true, false]);
1915        let rb = RecordBatch::try_from_iter([(
1916            "",
1917            Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
1918        )])
1919        .unwrap();
1920        let rb2 = roundtrip_ipc(&rb);
1921        assert_eq!(rb, rb2);
1922    }
1923
1924    #[test]
1925    fn test_roundtrip_stream_run_array_sliced() {
1926        let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
1927            .into_iter()
1928            .collect();
1929        let run_array_1_sliced = run_array_1.slice(2, 5);
1930
1931        let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
1932        let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1933        run_array_2_builder.extend(run_array_2_inupt);
1934        let run_array_2 = run_array_2_builder.finish();
1935
1936        let schema = Arc::new(Schema::new(vec![
1937            Field::new(
1938                "run_array_1_sliced",
1939                run_array_1_sliced.data_type().clone(),
1940                false,
1941            ),
1942            Field::new("run_array_2", run_array_2.data_type().clone(), false),
1943        ]));
1944        let input_batch = RecordBatch::try_new(
1945            schema,
1946            vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
1947        )
1948        .unwrap();
1949        let output_batch = roundtrip_ipc_stream(&input_batch);
1950
1951        // As partial comparison not yet supported for run arrays, the sliced run array
1952        // has to be unsliced before comparing with the output. the second run array
1953        // can be compared as such.
1954        assert_eq!(input_batch.column(1), output_batch.column(1));
1955
1956        let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
1957        assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
1958    }
1959
1960    #[test]
1961    fn test_roundtrip_stream_nested_dict() {
1962        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
1963        let dict = Arc::new(
1964            xs.clone()
1965                .into_iter()
1966                .collect::<DictionaryArray<Int8Type>>(),
1967        );
1968        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
1969        let struct_array = StructArray::from(vec![
1970            (
1971                Arc::new(Field::new("f2.1", DataType::Utf8, false)),
1972                string_array,
1973            ),
1974            (
1975                Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
1976                dict.clone() as ArrayRef,
1977            ),
1978        ]);
1979        let schema = Arc::new(Schema::new(vec![
1980            Field::new("f1_string", DataType::Utf8, false),
1981            Field::new("f2_struct", struct_array.data_type().clone(), false),
1982        ]));
1983        let input_batch = RecordBatch::try_new(
1984            schema,
1985            vec![
1986                Arc::new(StringArray::from(xs.clone())),
1987                Arc::new(struct_array),
1988            ],
1989        )
1990        .unwrap();
1991        let output_batch = roundtrip_ipc_stream(&input_batch);
1992        assert_eq!(input_batch, output_batch);
1993    }
1994
1995    #[test]
1996    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
1997        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
1998        let values = Arc::new(values) as ArrayRef;
1999        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2000        let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2001
2002        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2003        let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2004
2005        #[allow(deprecated)]
2006        let keys_field = Arc::new(Field::new_dict(
2007            "keys",
2008            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2009            true, // It is technically not legal for this field to be null.
2010            1,
2011            false,
2012        ));
2013        #[allow(deprecated)]
2014        let values_field = Arc::new(Field::new_dict(
2015            "values",
2016            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2017            true,
2018            2,
2019            false,
2020        ));
2021        let entry_struct = StructArray::from(vec![
2022            (keys_field, make_array(key_dict_array.into_data())),
2023            (values_field, make_array(value_dict_array.into_data())),
2024        ]);
2025        let map_data_type = DataType::Map(
2026            Arc::new(Field::new(
2027                "entries",
2028                entry_struct.data_type().clone(),
2029                false,
2030            )),
2031            false,
2032        );
2033
2034        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2035        let map_data = ArrayData::builder(map_data_type)
2036            .len(3)
2037            .add_buffer(entry_offsets)
2038            .add_child_data(entry_struct.into_data())
2039            .build()
2040            .unwrap();
2041        let map_array = MapArray::from(map_data);
2042
2043        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2044        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2045
2046        let schema = Arc::new(Schema::new(vec![Field::new(
2047            "f1",
2048            dict_dict_array.data_type().clone(),
2049            false,
2050        )]));
2051        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2052        let output_batch = roundtrip_ipc_stream(&input_batch);
2053        assert_eq!(input_batch, output_batch);
2054    }
2055
2056    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2057        OffsetSize: OffsetSizeTrait,
2058        U: ArrowNativeType,
2059    >(
2060        list_data_type: DataType,
2061        offsets: &[U; 5],
2062    ) {
2063        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2064        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2065        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2066        let dict_data = dict_array.to_data();
2067
2068        let value_offsets = Buffer::from_slice_ref(offsets);
2069
2070        let list_data = ArrayData::builder(list_data_type)
2071            .len(4)
2072            .add_buffer(value_offsets)
2073            .add_child_data(dict_data)
2074            .build()
2075            .unwrap();
2076        let list_array = GenericListArray::<OffsetSize>::from(list_data);
2077
2078        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2079        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2080
2081        let schema = Arc::new(Schema::new(vec![Field::new(
2082            "f1",
2083            dict_dict_array.data_type().clone(),
2084            false,
2085        )]));
2086        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2087        let output_batch = roundtrip_ipc_stream(&input_batch);
2088        assert_eq!(input_batch, output_batch);
2089    }
2090
2091    #[test]
2092    fn test_roundtrip_stream_dict_of_list_of_dict() {
2093        // list
2094        #[allow(deprecated)]
2095        let list_data_type = DataType::List(Arc::new(Field::new_dict(
2096            "item",
2097            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2098            true,
2099            1,
2100            false,
2101        )));
2102        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2103        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2104
2105        // large list
2106        #[allow(deprecated)]
2107        let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2108            "item",
2109            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2110            true,
2111            1,
2112            false,
2113        )));
2114        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2115        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2116    }
2117
2118    #[test]
2119    fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2120        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2121        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2122        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2123        let dict_data = dict_array.into_data();
2124
2125        #[allow(deprecated)]
2126        let list_data_type = DataType::FixedSizeList(
2127            Arc::new(Field::new_dict(
2128                "item",
2129                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2130                true,
2131                1,
2132                false,
2133            )),
2134            3,
2135        );
2136        let list_data = ArrayData::builder(list_data_type)
2137            .len(3)
2138            .add_child_data(dict_data)
2139            .build()
2140            .unwrap();
2141        let list_array = FixedSizeListArray::from(list_data);
2142
2143        let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2144        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2145
2146        let schema = Arc::new(Schema::new(vec![Field::new(
2147            "f1",
2148            dict_dict_array.data_type().clone(),
2149            false,
2150        )]));
2151        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2152        let output_batch = roundtrip_ipc_stream(&input_batch);
2153        assert_eq!(input_batch, output_batch);
2154    }
2155
2156    const LONG_TEST_STRING: &str =
2157        "This is a long string to make sure binary view array handles it";
2158
2159    #[test]
2160    fn test_roundtrip_view_types() {
2161        let schema = Schema::new(vec![
2162            Field::new("field_1", DataType::BinaryView, true),
2163            Field::new("field_2", DataType::Utf8, true),
2164            Field::new("field_3", DataType::Utf8View, true),
2165        ]);
2166        let bin_values: Vec<Option<&[u8]>> = vec![
2167            Some(b"foo"),
2168            None,
2169            Some(b"bar"),
2170            Some(LONG_TEST_STRING.as_bytes()),
2171        ];
2172        let utf8_values: Vec<Option<&str>> =
2173            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2174        let bin_view_array = BinaryViewArray::from_iter(bin_values);
2175        let utf8_array = StringArray::from_iter(utf8_values.iter());
2176        let utf8_view_array = StringViewArray::from_iter(utf8_values);
2177        let record_batch = RecordBatch::try_new(
2178            Arc::new(schema.clone()),
2179            vec![
2180                Arc::new(bin_view_array),
2181                Arc::new(utf8_array),
2182                Arc::new(utf8_view_array),
2183            ],
2184        )
2185        .unwrap();
2186
2187        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2188        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2189
2190        let sliced_batch = record_batch.slice(1, 2);
2191        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2192        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2193    }
2194
2195    #[test]
2196    fn test_roundtrip_view_types_nested_dict() {
2197        let bin_values: Vec<Option<&[u8]>> = vec![
2198            Some(b"foo"),
2199            None,
2200            Some(b"bar"),
2201            Some(LONG_TEST_STRING.as_bytes()),
2202            Some(b"field"),
2203        ];
2204        let utf8_values: Vec<Option<&str>> = vec![
2205            Some("foo"),
2206            None,
2207            Some("bar"),
2208            Some(LONG_TEST_STRING),
2209            Some("field"),
2210        ];
2211        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2212        let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2213
2214        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2215        let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2216        #[allow(deprecated)]
2217        let keys_field = Arc::new(Field::new_dict(
2218            "keys",
2219            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2220            true,
2221            1,
2222            false,
2223        ));
2224
2225        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2226        let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2227        #[allow(deprecated)]
2228        let values_field = Arc::new(Field::new_dict(
2229            "values",
2230            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2231            true,
2232            2,
2233            false,
2234        ));
2235        let entry_struct = StructArray::from(vec![
2236            (keys_field, make_array(key_dict_array.into_data())),
2237            (values_field, make_array(value_dict_array.into_data())),
2238        ]);
2239
2240        let map_data_type = DataType::Map(
2241            Arc::new(Field::new(
2242                "entries",
2243                entry_struct.data_type().clone(),
2244                false,
2245            )),
2246            false,
2247        );
2248        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2249        let map_data = ArrayData::builder(map_data_type)
2250            .len(3)
2251            .add_buffer(entry_offsets)
2252            .add_child_data(entry_struct.into_data())
2253            .build()
2254            .unwrap();
2255        let map_array = MapArray::from(map_data);
2256
2257        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2258        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2259        let schema = Arc::new(Schema::new(vec![Field::new(
2260            "f1",
2261            dict_dict_array.data_type().clone(),
2262            false,
2263        )]));
2264        let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2265        assert_eq!(batch, roundtrip_ipc(&batch));
2266        assert_eq!(batch, roundtrip_ipc_stream(&batch));
2267
2268        let sliced_batch = batch.slice(1, 2);
2269        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2270        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2271    }
2272
2273    #[test]
2274    fn test_no_columns_batch() {
2275        let schema = Arc::new(Schema::empty());
2276        let options = RecordBatchOptions::new()
2277            .with_match_field_names(true)
2278            .with_row_count(Some(10));
2279        let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2280        let output_batch = roundtrip_ipc_stream(&input_batch);
2281        assert_eq!(input_batch, output_batch);
2282    }
2283
2284    #[test]
2285    fn test_unaligned() {
2286        let batch = RecordBatch::try_from_iter(vec![(
2287            "i32",
2288            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2289        )])
2290        .unwrap();
2291
2292        let gen = IpcDataGenerator {};
2293        #[allow(deprecated)]
2294        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2295        let (_, encoded) = gen
2296            .encoded_batch(&batch, &mut dict_tracker, &Default::default())
2297            .unwrap();
2298
2299        let message = root_as_message(&encoded.ipc_message).unwrap();
2300
2301        // Construct an unaligned buffer
2302        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2303        buffer.push(0_u8);
2304        buffer.extend_from_slice(&encoded.arrow_data);
2305        let b = Buffer::from(buffer).slice(1);
2306        assert_ne!(b.as_ptr().align_offset(8), 0);
2307
2308        let ipc_batch = message.header_as_record_batch().unwrap();
2309        let roundtrip = RecordBatchDecoder::try_new(
2310            &b,
2311            ipc_batch,
2312            batch.schema(),
2313            &Default::default(),
2314            &message.version(),
2315        )
2316        .unwrap()
2317        .with_require_alignment(false)
2318        .read_record_batch()
2319        .unwrap();
2320        assert_eq!(batch, roundtrip);
2321    }
2322
2323    #[test]
2324    fn test_unaligned_throws_error_with_require_alignment() {
2325        let batch = RecordBatch::try_from_iter(vec![(
2326            "i32",
2327            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2328        )])
2329        .unwrap();
2330
2331        let gen = IpcDataGenerator {};
2332        #[allow(deprecated)]
2333        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2334        let (_, encoded) = gen
2335            .encoded_batch(&batch, &mut dict_tracker, &Default::default())
2336            .unwrap();
2337
2338        let message = root_as_message(&encoded.ipc_message).unwrap();
2339
2340        // Construct an unaligned buffer
2341        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2342        buffer.push(0_u8);
2343        buffer.extend_from_slice(&encoded.arrow_data);
2344        let b = Buffer::from(buffer).slice(1);
2345        assert_ne!(b.as_ptr().align_offset(8), 0);
2346
2347        let ipc_batch = message.header_as_record_batch().unwrap();
2348        let result = RecordBatchDecoder::try_new(
2349            &b,
2350            ipc_batch,
2351            batch.schema(),
2352            &Default::default(),
2353            &message.version(),
2354        )
2355        .unwrap()
2356        .with_require_alignment(true)
2357        .read_record_batch();
2358
2359        let error = result.unwrap_err();
2360        assert_eq!(
2361            error.to_string(),
2362            "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2363             offset from expected alignment of 4 by 1"
2364        );
2365    }
2366
2367    #[test]
2368    fn test_file_with_massive_column_count() {
2369        // 499_999 is upper limit for default settings (1_000_000)
2370        let limit = 600_000;
2371
2372        let fields = (0..limit)
2373            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2374            .collect::<Vec<_>>();
2375        let schema = Arc::new(Schema::new(fields));
2376        let batch = RecordBatch::new_empty(schema);
2377
2378        let mut buf = Vec::new();
2379        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2380        writer.write(&batch).unwrap();
2381        writer.finish().unwrap();
2382        drop(writer);
2383
2384        let mut reader = FileReaderBuilder::new()
2385            .with_max_footer_fb_tables(1_500_000)
2386            .build(std::io::Cursor::new(buf))
2387            .unwrap();
2388        let roundtrip_batch = reader.next().unwrap().unwrap();
2389
2390        assert_eq!(batch, roundtrip_batch);
2391    }
2392
2393    #[test]
2394    fn test_file_with_deeply_nested_columns() {
2395        // 60 is upper limit for default settings (64)
2396        let limit = 61;
2397
2398        let fields = (0..limit).fold(
2399            vec![Field::new("leaf", DataType::Boolean, false)],
2400            |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2401        );
2402        let schema = Arc::new(Schema::new(fields));
2403        let batch = RecordBatch::new_empty(schema);
2404
2405        let mut buf = Vec::new();
2406        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2407        writer.write(&batch).unwrap();
2408        writer.finish().unwrap();
2409        drop(writer);
2410
2411        let mut reader = FileReaderBuilder::new()
2412            .with_max_footer_fb_depth(65)
2413            .build(std::io::Cursor::new(buf))
2414            .unwrap();
2415        let roundtrip_batch = reader.next().unwrap().unwrap();
2416
2417        assert_eq!(batch, roundtrip_batch);
2418    }
2419
2420    #[test]
2421    fn test_invalid_struct_array_ipc_read_errors() {
2422        let a_field = Field::new("a", DataType::Int32, false);
2423        let b_field = Field::new("b", DataType::Int32, false);
2424
2425        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2426            "s",
2427            vec![a_field.clone(), b_field.clone()],
2428            false,
2429        )]));
2430
2431        let a_array_data = ArrayData::builder(a_field.data_type().clone())
2432            .len(4)
2433            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2434            .build()
2435            .unwrap();
2436        let b_array_data = ArrayData::builder(b_field.data_type().clone())
2437            .len(3)
2438            .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2439            .build()
2440            .unwrap();
2441
2442        let struct_data_type = schema.field(0).data_type();
2443
2444        let invalid_struct_arr = unsafe {
2445            make_array(
2446                ArrayData::builder(struct_data_type.clone())
2447                    .len(4)
2448                    .add_child_data(a_array_data)
2449                    .add_child_data(b_array_data)
2450                    .build_unchecked(),
2451            )
2452        };
2453        expect_ipc_validation_error(
2454            Arc::new(invalid_struct_arr),
2455            "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
2456        );
2457    }
2458
2459    #[test]
2460    fn test_same_dict_id_without_preserve() {
2461        let batch = RecordBatch::try_new(
2462            Arc::new(Schema::new(
2463                ["a", "b"]
2464                    .iter()
2465                    .map(|name| {
2466                        #[allow(deprecated)]
2467                        Field::new_dict(
2468                            name.to_string(),
2469                            DataType::Dictionary(
2470                                Box::new(DataType::Int32),
2471                                Box::new(DataType::Utf8),
2472                            ),
2473                            true,
2474                            0,
2475                            false,
2476                        )
2477                    })
2478                    .collect::<Vec<Field>>(),
2479            )),
2480            vec![
2481                Arc::new(
2482                    vec![Some("c"), Some("d")]
2483                        .into_iter()
2484                        .collect::<DictionaryArray<Int32Type>>(),
2485                ) as ArrayRef,
2486                Arc::new(
2487                    vec![Some("e"), Some("f")]
2488                        .into_iter()
2489                        .collect::<DictionaryArray<Int32Type>>(),
2490                ) as ArrayRef,
2491            ],
2492        )
2493        .expect("Failed to create RecordBatch");
2494
2495        // serialize the record batch as an IPC stream
2496        let mut buf = vec![];
2497        {
2498            let mut writer = crate::writer::StreamWriter::try_new_with_options(
2499                &mut buf,
2500                batch.schema().as_ref(),
2501                #[allow(deprecated)]
2502                crate::writer::IpcWriteOptions::default().with_preserve_dict_id(false),
2503            )
2504            .expect("Failed to create StreamWriter");
2505            writer.write(&batch).expect("Failed to write RecordBatch");
2506            writer.finish().expect("Failed to finish StreamWriter");
2507        }
2508
2509        StreamReader::try_new(std::io::Cursor::new(buf), None)
2510            .expect("Failed to create StreamReader")
2511            .for_each(|decoded_batch| {
2512                assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
2513            });
2514    }
2515
2516    #[test]
2517    fn test_validation_of_invalid_list_array() {
2518        // ListArray with invalid offsets
2519        let array = unsafe {
2520            let values = Int32Array::from(vec![1, 2, 3]);
2521            let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); // offsets can't go backwards
2522            let offsets = OffsetBuffer::new_unchecked(bad_offsets); // INVALID array created
2523            let field = Field::new_list_field(DataType::Int32, true);
2524            let nulls = None;
2525            ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
2526        };
2527
2528        expect_ipc_validation_error(
2529            Arc::new(array),
2530            "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2"
2531        );
2532    }
2533
2534    #[test]
2535    fn test_validation_of_invalid_string_array() {
2536        let valid: &[u8] = b"   ";
2537        let mut invalid = vec![];
2538        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2539        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2540        let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2541        // data is not valid utf8 we can not construct a correct StringArray
2542        // safely, so purposely create an invalid StringArray
2543        let array = unsafe {
2544            StringArray::new_unchecked(
2545                binary_array.offsets().clone(),
2546                binary_array.values().clone(),
2547                binary_array.nulls().cloned(),
2548            )
2549        };
2550        expect_ipc_validation_error(
2551            Arc::new(array),
2552            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38"
2553        );
2554    }
2555
2556    #[test]
2557    fn test_validation_of_invalid_string_view_array() {
2558        let valid: &[u8] = b"   ";
2559        let mut invalid = vec![];
2560        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2561        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2562        let binary_view_array =
2563            BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2564        // data is not valid utf8 we can not construct a correct StringArray
2565        // safely, so purposely create an invalid StringArray
2566        let array = unsafe {
2567            StringViewArray::new_unchecked(
2568                binary_view_array.views().clone(),
2569                binary_view_array.data_buffers().to_vec(),
2570                binary_view_array.nulls().cloned(),
2571            )
2572        };
2573        expect_ipc_validation_error(
2574            Arc::new(array),
2575            "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38"
2576        );
2577    }
2578
2579    /// return an invalid dictionary array (key is larger than values)
2580    /// ListArray with invalid offsets
2581    #[test]
2582    fn test_validation_of_invalid_dictionary_array() {
2583        let array = unsafe {
2584            let values = StringArray::from_iter_values(["a", "b", "c"]);
2585            let keys = Int32Array::from(vec![1, 200]); // keys are not valid for values
2586            DictionaryArray::new_unchecked(keys, Arc::new(values))
2587        };
2588
2589        expect_ipc_validation_error(
2590            Arc::new(array),
2591            "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
2592        );
2593    }
2594
2595    /// Invalid Utf-8 sequence in the first character
2596    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
2597    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
2598
2599    /// Expect an error when reading the record batch using IPC or IPC Streams
2600    fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
2601        let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
2602
2603        // IPC Stream format
2604        let buf = write_stream(&rb); // write is ok
2605        let err = read_stream(&buf).unwrap_err();
2606        assert_eq!(err.to_string(), expected_err);
2607
2608        // IPC File format
2609        let buf = write_ipc(&rb); // write is ok
2610        let err = read_ipc(&buf).unwrap_err();
2611        assert_eq!(err.to_string(), expected_err);
2612
2613        // TODO verify there is no error when validation is disabled
2614        // see https://github.com/apache/arrow-rs/issues/3287
2615
2616        // IPC Format with FileDecoder
2617        let err = read_ipc_with_decoder(buf).unwrap_err();
2618        assert_eq!(err.to_string(), expected_err);
2619    }
2620}