arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
42use arrow_schema::*;
43
44use crate::compression::CompressionCodec;
45use crate::convert::IpcSchemaEncoder;
46use crate::CONTINUATION_MARKER;
47
48/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
49#[derive(Debug, Clone)]
50pub struct IpcWriteOptions {
51    /// Write padding after memory buffers to this multiple of bytes.
52    /// Must be 8, 16, 32, or 64 - defaults to 64.
53    alignment: u8,
54    /// The legacy format is for releases before 0.15.0, and uses metadata V4
55    write_legacy_ipc_format: bool,
56    /// The metadata version to write. The Rust IPC writer supports V4+
57    ///
58    /// *Default versions per crate*
59    ///
60    /// When creating the default IpcWriteOptions, the following metadata versions are used:
61    ///
62    /// version 2.0.0: V4, with legacy format enabled
63    /// version 4.0.0: V5
64    metadata_version: crate::MetadataVersion,
65    /// Compression, if desired. Will result in a runtime error
66    /// if the corresponding feature is not enabled
67    batch_compression_type: Option<crate::CompressionType>,
68    /// Flag indicating whether the writer should preserve the dictionary IDs defined in the
69    /// schema or generate unique dictionary IDs internally during encoding.
70    ///
71    /// Defaults to `false`
72    #[deprecated(
73        since = "54.0.0",
74        note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it."
75    )]
76    preserve_dict_id: bool,
77}
78
79impl IpcWriteOptions {
80    /// Configures compression when writing IPC files.
81    ///
82    /// Will result in a runtime error if the corresponding feature
83    /// is not enabled
84    pub fn try_with_compression(
85        mut self,
86        batch_compression_type: Option<crate::CompressionType>,
87    ) -> Result<Self, ArrowError> {
88        self.batch_compression_type = batch_compression_type;
89
90        if self.batch_compression_type.is_some()
91            && self.metadata_version < crate::MetadataVersion::V5
92        {
93            return Err(ArrowError::InvalidArgumentError(
94                "Compression only supported in metadata v5 and above".to_string(),
95            ));
96        }
97        Ok(self)
98    }
99    /// Try to create IpcWriteOptions, checking for incompatible settings
100    pub fn try_new(
101        alignment: usize,
102        write_legacy_ipc_format: bool,
103        metadata_version: crate::MetadataVersion,
104    ) -> Result<Self, ArrowError> {
105        let is_alignment_valid =
106            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
107        if !is_alignment_valid {
108            return Err(ArrowError::InvalidArgumentError(
109                "Alignment should be 8, 16, 32, or 64.".to_string(),
110            ));
111        }
112        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
113        match metadata_version {
114            crate::MetadataVersion::V1
115            | crate::MetadataVersion::V2
116            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
117                "Writing IPC metadata version 3 and lower not supported".to_string(),
118            )),
119            #[allow(deprecated)]
120            crate::MetadataVersion::V4 => Ok(Self {
121                alignment,
122                write_legacy_ipc_format,
123                metadata_version,
124                batch_compression_type: None,
125                preserve_dict_id: false,
126            }),
127            crate::MetadataVersion::V5 => {
128                if write_legacy_ipc_format {
129                    Err(ArrowError::InvalidArgumentError(
130                        "Legacy IPC format only supported on metadata version 4".to_string(),
131                    ))
132                } else {
133                    #[allow(deprecated)]
134                    Ok(Self {
135                        alignment,
136                        write_legacy_ipc_format,
137                        metadata_version,
138                        batch_compression_type: None,
139                        preserve_dict_id: false,
140                    })
141                }
142            }
143            z => Err(ArrowError::InvalidArgumentError(format!(
144                "Unsupported crate::MetadataVersion {z:?}"
145            ))),
146        }
147    }
148
149    /// Return whether the writer is configured to preserve the dictionary IDs
150    /// defined in the schema
151    #[deprecated(
152        since = "54.0.0",
153        note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
154    )]
155    pub fn preserve_dict_id(&self) -> bool {
156        #[allow(deprecated)]
157        self.preserve_dict_id
158    }
159
160    /// Set whether the IPC writer should preserve the dictionary IDs in the schema
161    /// or auto-assign unique dictionary IDs during encoding (defaults to true)
162    ///
163    /// If this option is true,  the application must handle assigning ids
164    /// to the dictionary batches in order to encode them correctly
165    ///
166    /// The default will change to `false`  in future releases
167    #[deprecated(
168        since = "54.0.0",
169        note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
170    )]
171    #[allow(deprecated)]
172    pub fn with_preserve_dict_id(mut self, preserve_dict_id: bool) -> Self {
173        self.preserve_dict_id = preserve_dict_id;
174        self
175    }
176}
177
178impl Default for IpcWriteOptions {
179    fn default() -> Self {
180        #[allow(deprecated)]
181        Self {
182            alignment: 64,
183            write_legacy_ipc_format: false,
184            metadata_version: crate::MetadataVersion::V5,
185            batch_compression_type: None,
186            preserve_dict_id: false,
187        }
188    }
189}
190
191#[derive(Debug, Default)]
192/// Handles low level details of encoding [`Array`] and [`Schema`] into the
193/// [Arrow IPC Format].
194///
195/// # Example
196/// ```
197/// # fn run() {
198/// # use std::sync::Arc;
199/// # use arrow_array::UInt64Array;
200/// # use arrow_array::RecordBatch;
201/// # use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
202///
203/// // Create a record batch
204/// let batch = RecordBatch::try_from_iter(vec![
205///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
206/// ]).unwrap();
207///
208/// // Error of dictionary ids are replaced.
209/// let error_on_replacement = true;
210/// let options = IpcWriteOptions::default();
211/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
212///
213/// // encode the batch into zero or more encoded dictionaries
214/// // and the data for the actual array.
215/// let data_gen = IpcDataGenerator::default();
216/// let (encoded_dictionaries, encoded_message) = data_gen
217///   .encoded_batch(&batch, &mut dictionary_tracker, &options)
218///   .unwrap();
219/// # }
220/// ```
221///
222/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
223pub struct IpcDataGenerator {}
224
225impl IpcDataGenerator {
226    /// Converts a schema to an IPC message along with `dictionary_tracker`
227    /// and returns it encoded inside [EncodedData] as a flatbuffer
228    ///
229    /// Preferred method over [IpcDataGenerator::schema_to_bytes] since it's
230    /// deprecated since Arrow v54.0.0
231    pub fn schema_to_bytes_with_dictionary_tracker(
232        &self,
233        schema: &Schema,
234        dictionary_tracker: &mut DictionaryTracker,
235        write_options: &IpcWriteOptions,
236    ) -> EncodedData {
237        let mut fbb = FlatBufferBuilder::new();
238        let schema = {
239            let fb = IpcSchemaEncoder::new()
240                .with_dictionary_tracker(dictionary_tracker)
241                .schema_to_fb_offset(&mut fbb, schema);
242            fb.as_union_value()
243        };
244
245        let mut message = crate::MessageBuilder::new(&mut fbb);
246        message.add_version(write_options.metadata_version);
247        message.add_header_type(crate::MessageHeader::Schema);
248        message.add_bodyLength(0);
249        message.add_header(schema);
250        // TODO: custom metadata
251        let data = message.finish();
252        fbb.finish(data, None);
253
254        let data = fbb.finished_data();
255        EncodedData {
256            ipc_message: data.to_vec(),
257            arrow_data: vec![],
258        }
259    }
260
261    #[deprecated(
262        since = "54.0.0",
263        note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release."
264    )]
265    /// Converts a schema to an IPC message and returns it encoded inside [EncodedData] as a flatbuffer
266    pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData {
267        let mut fbb = FlatBufferBuilder::new();
268        let schema = {
269            #[allow(deprecated)]
270            // This will be replaced with the IpcSchemaConverter in the next release.
271            let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
272            fb.as_union_value()
273        };
274
275        let mut message = crate::MessageBuilder::new(&mut fbb);
276        message.add_version(write_options.metadata_version);
277        message.add_header_type(crate::MessageHeader::Schema);
278        message.add_bodyLength(0);
279        message.add_header(schema);
280        // TODO: custom metadata
281        let data = message.finish();
282        fbb.finish(data, None);
283
284        let data = fbb.finished_data();
285        EncodedData {
286            ipc_message: data.to_vec(),
287            arrow_data: vec![],
288        }
289    }
290
291    fn _encode_dictionaries<I: Iterator<Item = i64>>(
292        &self,
293        column: &ArrayRef,
294        encoded_dictionaries: &mut Vec<EncodedData>,
295        dictionary_tracker: &mut DictionaryTracker,
296        write_options: &IpcWriteOptions,
297        dict_id: &mut I,
298    ) -> Result<(), ArrowError> {
299        match column.data_type() {
300            DataType::Struct(fields) => {
301                let s = as_struct_array(column);
302                for (field, column) in fields.iter().zip(s.columns()) {
303                    self.encode_dictionaries(
304                        field,
305                        column,
306                        encoded_dictionaries,
307                        dictionary_tracker,
308                        write_options,
309                        dict_id,
310                    )?;
311                }
312            }
313            DataType::RunEndEncoded(_, values) => {
314                let data = column.to_data();
315                if data.child_data().len() != 2 {
316                    return Err(ArrowError::InvalidArgumentError(format!(
317                        "The run encoded array should have exactly two child arrays. Found {}",
318                        data.child_data().len()
319                    )));
320                }
321                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
322                // only for values array.
323                let values_array = make_array(data.child_data()[1].clone());
324                self.encode_dictionaries(
325                    values,
326                    &values_array,
327                    encoded_dictionaries,
328                    dictionary_tracker,
329                    write_options,
330                    dict_id,
331                )?;
332            }
333            DataType::List(field) => {
334                let list = as_list_array(column);
335                self.encode_dictionaries(
336                    field,
337                    list.values(),
338                    encoded_dictionaries,
339                    dictionary_tracker,
340                    write_options,
341                    dict_id,
342                )?;
343            }
344            DataType::LargeList(field) => {
345                let list = as_large_list_array(column);
346                self.encode_dictionaries(
347                    field,
348                    list.values(),
349                    encoded_dictionaries,
350                    dictionary_tracker,
351                    write_options,
352                    dict_id,
353                )?;
354            }
355            DataType::FixedSizeList(field, _) => {
356                let list = column
357                    .as_any()
358                    .downcast_ref::<FixedSizeListArray>()
359                    .expect("Unable to downcast to fixed size list array");
360                self.encode_dictionaries(
361                    field,
362                    list.values(),
363                    encoded_dictionaries,
364                    dictionary_tracker,
365                    write_options,
366                    dict_id,
367                )?;
368            }
369            DataType::Map(field, _) => {
370                let map_array = as_map_array(column);
371
372                let (keys, values) = match field.data_type() {
373                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
374                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
375                };
376
377                // keys
378                self.encode_dictionaries(
379                    keys,
380                    map_array.keys(),
381                    encoded_dictionaries,
382                    dictionary_tracker,
383                    write_options,
384                    dict_id,
385                )?;
386
387                // values
388                self.encode_dictionaries(
389                    values,
390                    map_array.values(),
391                    encoded_dictionaries,
392                    dictionary_tracker,
393                    write_options,
394                    dict_id,
395                )?;
396            }
397            DataType::Union(fields, _) => {
398                let union = as_union_array(column);
399                for (type_id, field) in fields.iter() {
400                    let column = union.child(type_id);
401                    self.encode_dictionaries(
402                        field,
403                        column,
404                        encoded_dictionaries,
405                        dictionary_tracker,
406                        write_options,
407                        dict_id,
408                    )?;
409                }
410            }
411            _ => (),
412        }
413
414        Ok(())
415    }
416
417    fn encode_dictionaries<I: Iterator<Item = i64>>(
418        &self,
419        field: &Field,
420        column: &ArrayRef,
421        encoded_dictionaries: &mut Vec<EncodedData>,
422        dictionary_tracker: &mut DictionaryTracker,
423        write_options: &IpcWriteOptions,
424        dict_id_seq: &mut I,
425    ) -> Result<(), ArrowError> {
426        match column.data_type() {
427            DataType::Dictionary(_key_type, _value_type) => {
428                let dict_data = column.to_data();
429                let dict_values = &dict_data.child_data()[0];
430
431                let values = make_array(dict_data.child_data()[0].clone());
432
433                self._encode_dictionaries(
434                    &values,
435                    encoded_dictionaries,
436                    dictionary_tracker,
437                    write_options,
438                    dict_id_seq,
439                )?;
440
441                // It's importnat to only take the dict_id at this point, because the dict ID
442                // sequence is assigned depth-first, so we need to first encode children and have
443                // them take their assigned dict IDs before we take the dict ID for this field.
444                #[allow(deprecated)]
445                let dict_id = dict_id_seq
446                    .next()
447                    .or_else(|| field.dict_id())
448                    .ok_or_else(|| {
449                        ArrowError::IpcError(format!("no dict id for field {}", field.name()))
450                    })?;
451
452                let emit = dictionary_tracker.insert(dict_id, column)?;
453
454                if emit {
455                    encoded_dictionaries.push(self.dictionary_batch_to_bytes(
456                        dict_id,
457                        dict_values,
458                        write_options,
459                    )?);
460                }
461            }
462            _ => self._encode_dictionaries(
463                column,
464                encoded_dictionaries,
465                dictionary_tracker,
466                write_options,
467                dict_id_seq,
468            )?,
469        }
470
471        Ok(())
472    }
473
474    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
475    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
476    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
477    pub fn encoded_batch(
478        &self,
479        batch: &RecordBatch,
480        dictionary_tracker: &mut DictionaryTracker,
481        write_options: &IpcWriteOptions,
482    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
483        let schema = batch.schema();
484        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
485
486        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
487
488        for (i, field) in schema.fields().iter().enumerate() {
489            let column = batch.column(i);
490            self.encode_dictionaries(
491                field,
492                column,
493                &mut encoded_dictionaries,
494                dictionary_tracker,
495                write_options,
496                &mut dict_id,
497            )?;
498        }
499
500        let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
501        Ok((encoded_dictionaries, encoded_message))
502    }
503
504    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
505    /// other for the batch's data
506    fn record_batch_to_bytes(
507        &self,
508        batch: &RecordBatch,
509        write_options: &IpcWriteOptions,
510    ) -> Result<EncodedData, ArrowError> {
511        let mut fbb = FlatBufferBuilder::new();
512
513        let mut nodes: Vec<crate::FieldNode> = vec![];
514        let mut buffers: Vec<crate::Buffer> = vec![];
515        let mut arrow_data: Vec<u8> = vec![];
516        let mut offset = 0;
517
518        // get the type of compression
519        let batch_compression_type = write_options.batch_compression_type;
520
521        let compression = batch_compression_type.map(|batch_compression_type| {
522            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
523            c.add_method(crate::BodyCompressionMethod::BUFFER);
524            c.add_codec(batch_compression_type);
525            c.finish()
526        });
527
528        let compression_codec: Option<CompressionCodec> =
529            batch_compression_type.map(TryInto::try_into).transpose()?;
530
531        let mut variadic_buffer_counts = vec![];
532
533        for array in batch.columns() {
534            let array_data = array.to_data();
535            offset = write_array_data(
536                &array_data,
537                &mut buffers,
538                &mut arrow_data,
539                &mut nodes,
540                offset,
541                array.len(),
542                array.null_count(),
543                compression_codec,
544                write_options,
545            )?;
546
547            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
548        }
549        // pad the tail of body data
550        let len = arrow_data.len();
551        let pad_len = pad_to_alignment(write_options.alignment, len);
552        arrow_data.extend_from_slice(&PADDING[..pad_len]);
553
554        // write data
555        let buffers = fbb.create_vector(&buffers);
556        let nodes = fbb.create_vector(&nodes);
557        let variadic_buffer = if variadic_buffer_counts.is_empty() {
558            None
559        } else {
560            Some(fbb.create_vector(&variadic_buffer_counts))
561        };
562
563        let root = {
564            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
565            batch_builder.add_length(batch.num_rows() as i64);
566            batch_builder.add_nodes(nodes);
567            batch_builder.add_buffers(buffers);
568            if let Some(c) = compression {
569                batch_builder.add_compression(c);
570            }
571
572            if let Some(v) = variadic_buffer {
573                batch_builder.add_variadicBufferCounts(v);
574            }
575            let b = batch_builder.finish();
576            b.as_union_value()
577        };
578        // create an crate::Message
579        let mut message = crate::MessageBuilder::new(&mut fbb);
580        message.add_version(write_options.metadata_version);
581        message.add_header_type(crate::MessageHeader::RecordBatch);
582        message.add_bodyLength(arrow_data.len() as i64);
583        message.add_header(root);
584        let root = message.finish();
585        fbb.finish(root, None);
586        let finished_data = fbb.finished_data();
587
588        Ok(EncodedData {
589            ipc_message: finished_data.to_vec(),
590            arrow_data,
591        })
592    }
593
594    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
595    /// other for the data
596    fn dictionary_batch_to_bytes(
597        &self,
598        dict_id: i64,
599        array_data: &ArrayData,
600        write_options: &IpcWriteOptions,
601    ) -> Result<EncodedData, ArrowError> {
602        let mut fbb = FlatBufferBuilder::new();
603
604        let mut nodes: Vec<crate::FieldNode> = vec![];
605        let mut buffers: Vec<crate::Buffer> = vec![];
606        let mut arrow_data: Vec<u8> = vec![];
607
608        // get the type of compression
609        let batch_compression_type = write_options.batch_compression_type;
610
611        let compression = batch_compression_type.map(|batch_compression_type| {
612            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
613            c.add_method(crate::BodyCompressionMethod::BUFFER);
614            c.add_codec(batch_compression_type);
615            c.finish()
616        });
617
618        let compression_codec: Option<CompressionCodec> = batch_compression_type
619            .map(|batch_compression_type| batch_compression_type.try_into())
620            .transpose()?;
621
622        write_array_data(
623            array_data,
624            &mut buffers,
625            &mut arrow_data,
626            &mut nodes,
627            0,
628            array_data.len(),
629            array_data.null_count(),
630            compression_codec,
631            write_options,
632        )?;
633
634        let mut variadic_buffer_counts = vec![];
635        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
636
637        // pad the tail of body data
638        let len = arrow_data.len();
639        let pad_len = pad_to_alignment(write_options.alignment, len);
640        arrow_data.extend_from_slice(&PADDING[..pad_len]);
641
642        // write data
643        let buffers = fbb.create_vector(&buffers);
644        let nodes = fbb.create_vector(&nodes);
645        let variadic_buffer = if variadic_buffer_counts.is_empty() {
646            None
647        } else {
648            Some(fbb.create_vector(&variadic_buffer_counts))
649        };
650
651        let root = {
652            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
653            batch_builder.add_length(array_data.len() as i64);
654            batch_builder.add_nodes(nodes);
655            batch_builder.add_buffers(buffers);
656            if let Some(c) = compression {
657                batch_builder.add_compression(c);
658            }
659            if let Some(v) = variadic_buffer {
660                batch_builder.add_variadicBufferCounts(v);
661            }
662            batch_builder.finish()
663        };
664
665        let root = {
666            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
667            batch_builder.add_id(dict_id);
668            batch_builder.add_data(root);
669            batch_builder.finish().as_union_value()
670        };
671
672        let root = {
673            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
674            message_builder.add_version(write_options.metadata_version);
675            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
676            message_builder.add_bodyLength(arrow_data.len() as i64);
677            message_builder.add_header(root);
678            message_builder.finish()
679        };
680
681        fbb.finish(root, None);
682        let finished_data = fbb.finished_data();
683
684        Ok(EncodedData {
685            ipc_message: finished_data.to_vec(),
686            arrow_data,
687        })
688    }
689}
690
691fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
692    match array.data_type() {
693        DataType::BinaryView | DataType::Utf8View => {
694            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
695            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
696            counts.push(array.buffers().len() as i64 - 1);
697        }
698        DataType::Dictionary(_, _) => {
699            // Do nothing
700            // Dictionary types are handled in `encode_dictionaries`.
701        }
702        _ => {
703            for child in array.child_data() {
704                append_variadic_buffer_counts(counts, child)
705            }
706        }
707    }
708}
709
710pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
711    match arr.data_type() {
712        DataType::RunEndEncoded(k, _) => match k.data_type() {
713            DataType::Int16 => {
714                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
715            }
716            DataType::Int32 => {
717                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
718            }
719            DataType::Int64 => {
720                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
721            }
722            d => unreachable!("Unexpected data type {d}"),
723        },
724        d => Err(ArrowError::InvalidArgumentError(format!(
725            "The given array is not a run array. Data type of given array: {d}"
726        ))),
727    }
728}
729
730// Returns a `RunArray` with zero offset and length matching the last value
731// in run_ends array.
732fn into_zero_offset_run_array<R: RunEndIndexType>(
733    run_array: RunArray<R>,
734) -> Result<RunArray<R>, ArrowError> {
735    let run_ends = run_array.run_ends();
736    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
737        return Ok(run_array);
738    }
739
740    // The physical index of original run_ends array from which the `ArrayData`is sliced.
741    let start_physical_index = run_ends.get_start_physical_index();
742
743    // The physical index of original run_ends array until which the `ArrayData`is sliced.
744    let end_physical_index = run_ends.get_end_physical_index();
745
746    let physical_length = end_physical_index - start_physical_index + 1;
747
748    // build new run_ends array by subtracting offset from run ends.
749    let offset = R::Native::usize_as(run_ends.offset());
750    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
751    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
752        builder.append(run_end_value.sub_wrapping(offset));
753    }
754    builder.append(R::Native::from_usize(run_array.len()).unwrap());
755    let new_run_ends = unsafe {
756        // Safety:
757        // The function builds a valid run_ends array and hence need not be validated.
758        ArrayDataBuilder::new(R::DATA_TYPE)
759            .len(physical_length)
760            .add_buffer(builder.finish())
761            .build_unchecked()
762    };
763
764    // build new values by slicing physical indices.
765    let new_values = run_array
766        .values()
767        .slice(start_physical_index, physical_length)
768        .into_data();
769
770    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
771        .len(run_array.len())
772        .add_child_data(new_run_ends)
773        .add_child_data(new_values);
774    let array_data = unsafe {
775        // Safety:
776        //  This function builds a valid run array and hence can skip validation.
777        builder.build_unchecked()
778    };
779    Ok(array_data.into())
780}
781
782/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
783/// multiple times.
784///
785/// Can optionally error if an update to an existing dictionary is attempted, which
786/// isn't allowed in the `FileWriter`.
787#[derive(Debug)]
788pub struct DictionaryTracker {
789    written: HashMap<i64, ArrayData>,
790    dict_ids: Vec<i64>,
791    error_on_replacement: bool,
792    #[deprecated(
793        since = "54.0.0",
794        note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it."
795    )]
796    preserve_dict_id: bool,
797}
798
799impl DictionaryTracker {
800    /// Create a new [`DictionaryTracker`].
801    ///
802    /// If `error_on_replacement`
803    /// is true, an error will be generated if an update to an
804    /// existing dictionary is attempted.
805    ///
806    /// If `preserve_dict_id` is true, the dictionary ID defined in the schema
807    /// is used, otherwise a unique dictionary ID will be assigned by incrementing
808    /// the last seen dictionary ID (or using `0` if no other dictionary IDs have been
809    /// seen)
810    pub fn new(error_on_replacement: bool) -> Self {
811        #[allow(deprecated)]
812        Self {
813            written: HashMap::new(),
814            dict_ids: Vec::new(),
815            error_on_replacement,
816            preserve_dict_id: false,
817        }
818    }
819
820    /// Create a new [`DictionaryTracker`].
821    ///
822    /// If `error_on_replacement`
823    /// is true, an error will be generated if an update to an
824    /// existing dictionary is attempted.
825    #[deprecated(
826        since = "54.0.0",
827        note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
828    )]
829    pub fn new_with_preserve_dict_id(error_on_replacement: bool, preserve_dict_id: bool) -> Self {
830        #[allow(deprecated)]
831        Self {
832            written: HashMap::new(),
833            dict_ids: Vec::new(),
834            error_on_replacement,
835            preserve_dict_id,
836        }
837    }
838
839    /// Set the dictionary ID for `field`.
840    ///
841    /// If `preserve_dict_id` is true, this will return the `dict_id` in `field` (or panic if `field` does
842    /// not have a `dict_id` defined).
843    ///
844    /// If `preserve_dict_id` is false, this will return the value of the last `dict_id` assigned incremented by 1
845    /// or 0 in the case where no dictionary IDs have yet been assigned
846    #[deprecated(
847        since = "54.0.0",
848        note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
849    )]
850    pub fn set_dict_id(&mut self, field: &Field) -> i64 {
851        #[allow(deprecated)]
852        let next = if self.preserve_dict_id {
853            #[allow(deprecated)]
854            field.dict_id().expect("no dict_id in field")
855        } else {
856            self.dict_ids
857                .last()
858                .copied()
859                .map(|i| i + 1)
860                .unwrap_or_default()
861        };
862
863        self.dict_ids.push(next);
864        next
865    }
866
867    /// Return the sequence of dictionary IDs in the order they should be observed while
868    /// traversing the schema
869    pub fn dict_id(&mut self) -> &[i64] {
870        &self.dict_ids
871    }
872
873    /// Keep track of the dictionary with the given ID and values. Behavior:
874    ///
875    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
876    ///   that the dictionary was not actually inserted (because it's already been seen).
877    /// * If this ID has been written already but with different data, and this tracker is
878    ///   configured to return an error, return an error.
879    /// * If the tracker has not been configured to error on replacement or this dictionary
880    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
881    ///   inserted.
882    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
883        let dict_data = column.to_data();
884        let dict_values = &dict_data.child_data()[0];
885
886        // If a dictionary with this id was already emitted, check if it was the same.
887        if let Some(last) = self.written.get(&dict_id) {
888            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
889                // Same dictionary values => no need to emit it again
890                return Ok(false);
891            }
892            if self.error_on_replacement {
893                // If error on replacement perform a logical comparison
894                if last.child_data()[0] == *dict_values {
895                    // Same dictionary values => no need to emit it again
896                    return Ok(false);
897                }
898                return Err(ArrowError::InvalidArgumentError(
899                    "Dictionary replacement detected when writing IPC file format. \
900                     Arrow IPC files only support a single dictionary for a given field \
901                     across all batches."
902                        .to_string(),
903                ));
904            }
905        }
906
907        self.written.insert(dict_id, dict_data);
908        Ok(true)
909    }
910}
911
912/// Arrow File Writer
913///
914/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
915///
916/// # See Also
917///
918/// * [`StreamWriter`] for writing IPC Streams
919///
920/// # Example
921/// ```
922/// # use arrow_array::record_batch;
923/// # use arrow_ipc::writer::FileWriter;
924/// # let mut file = vec![]; // mimic a file for the example
925/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
926/// // create a new writer, the schema must be known in advance
927/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
928/// // write each batch to the underlying writer
929/// writer.write(&batch).unwrap();
930/// // When all batches are written, call finish to flush all buffers
931/// writer.finish().unwrap();
932/// ```
933/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
934pub struct FileWriter<W> {
935    /// The object to write to
936    writer: W,
937    /// IPC write options
938    write_options: IpcWriteOptions,
939    /// A reference to the schema, used in validating record batches
940    schema: SchemaRef,
941    /// The number of bytes between each block of bytes, as an offset for random access
942    block_offsets: usize,
943    /// Dictionary blocks that will be written as part of the IPC footer
944    dictionary_blocks: Vec<crate::Block>,
945    /// Record blocks that will be written as part of the IPC footer
946    record_blocks: Vec<crate::Block>,
947    /// Whether the writer footer has been written, and the writer is finished
948    finished: bool,
949    /// Keeps track of dictionaries that have been written
950    dictionary_tracker: DictionaryTracker,
951    /// User level customized metadata
952    custom_metadata: HashMap<String, String>,
953
954    data_gen: IpcDataGenerator,
955}
956
957impl<W: Write> FileWriter<BufWriter<W>> {
958    /// Try to create a new file writer with the writer wrapped in a BufWriter.
959    ///
960    /// See [`FileWriter::try_new`] for an unbuffered version.
961    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
962        Self::try_new(BufWriter::new(writer), schema)
963    }
964}
965
966impl<W: Write> FileWriter<W> {
967    /// Try to create a new writer, with the schema written as part of the header
968    ///
969    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
970    ///
971    /// # Errors
972    ///
973    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
974    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
975        let write_options = IpcWriteOptions::default();
976        Self::try_new_with_options(writer, schema, write_options)
977    }
978
979    /// Try to create a new writer with IpcWriteOptions
980    ///
981    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
982    ///
983    /// # Errors
984    ///
985    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
986    pub fn try_new_with_options(
987        mut writer: W,
988        schema: &Schema,
989        write_options: IpcWriteOptions,
990    ) -> Result<Self, ArrowError> {
991        let data_gen = IpcDataGenerator::default();
992        // write magic to header aligned on alignment boundary
993        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
994        let header_size = super::ARROW_MAGIC.len() + pad_len;
995        writer.write_all(&super::ARROW_MAGIC)?;
996        writer.write_all(&PADDING[..pad_len])?;
997        // write the schema, set the written bytes to the schema + header
998        #[allow(deprecated)]
999        let preserve_dict_id = write_options.preserve_dict_id;
1000        #[allow(deprecated)]
1001        let mut dictionary_tracker =
1002            DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1003        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1004            schema,
1005            &mut dictionary_tracker,
1006            &write_options,
1007        );
1008        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1009        Ok(Self {
1010            writer,
1011            write_options,
1012            schema: Arc::new(schema.clone()),
1013            block_offsets: meta + data + header_size,
1014            dictionary_blocks: vec![],
1015            record_blocks: vec![],
1016            finished: false,
1017            dictionary_tracker,
1018            custom_metadata: HashMap::new(),
1019            data_gen,
1020        })
1021    }
1022
1023    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1024    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1025        self.custom_metadata.insert(key.into(), value.into());
1026    }
1027
1028    /// Write a record batch to the file
1029    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1030        if self.finished {
1031            return Err(ArrowError::IpcError(
1032                "Cannot write record batch to file writer as it is closed".to_string(),
1033            ));
1034        }
1035
1036        let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
1037            batch,
1038            &mut self.dictionary_tracker,
1039            &self.write_options,
1040        )?;
1041
1042        for encoded_dictionary in encoded_dictionaries {
1043            let (meta, data) =
1044                write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1045
1046            let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1047            self.dictionary_blocks.push(block);
1048            self.block_offsets += meta + data;
1049        }
1050
1051        let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1052        // add a record block for the footer
1053        let block = crate::Block::new(
1054            self.block_offsets as i64,
1055            meta as i32, // TODO: is this still applicable?
1056            data as i64,
1057        );
1058        self.record_blocks.push(block);
1059        self.block_offsets += meta + data;
1060        Ok(())
1061    }
1062
1063    /// Write footer and closing tag, then mark the writer as done
1064    pub fn finish(&mut self) -> Result<(), ArrowError> {
1065        if self.finished {
1066            return Err(ArrowError::IpcError(
1067                "Cannot write footer to file writer as it is closed".to_string(),
1068            ));
1069        }
1070
1071        // write EOS
1072        write_continuation(&mut self.writer, &self.write_options, 0)?;
1073
1074        let mut fbb = FlatBufferBuilder::new();
1075        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1076        let record_batches = fbb.create_vector(&self.record_blocks);
1077        #[allow(deprecated)]
1078        let preserve_dict_id = self.write_options.preserve_dict_id;
1079        #[allow(deprecated)]
1080        let mut dictionary_tracker =
1081            DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1082        let schema = IpcSchemaEncoder::new()
1083            .with_dictionary_tracker(&mut dictionary_tracker)
1084            .schema_to_fb_offset(&mut fbb, &self.schema);
1085        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1086            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1087
1088        let root = {
1089            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1090            footer_builder.add_version(self.write_options.metadata_version);
1091            footer_builder.add_schema(schema);
1092            footer_builder.add_dictionaries(dictionaries);
1093            footer_builder.add_recordBatches(record_batches);
1094            if let Some(fb_custom_metadata) = fb_custom_metadata {
1095                footer_builder.add_custom_metadata(fb_custom_metadata);
1096            }
1097            footer_builder.finish()
1098        };
1099        fbb.finish(root, None);
1100        let footer_data = fbb.finished_data();
1101        self.writer.write_all(footer_data)?;
1102        self.writer
1103            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1104        self.writer.write_all(&super::ARROW_MAGIC)?;
1105        self.writer.flush()?;
1106        self.finished = true;
1107
1108        Ok(())
1109    }
1110
1111    /// Returns the arrow [`SchemaRef`] for this arrow file.
1112    pub fn schema(&self) -> &SchemaRef {
1113        &self.schema
1114    }
1115
1116    /// Gets a reference to the underlying writer.
1117    pub fn get_ref(&self) -> &W {
1118        &self.writer
1119    }
1120
1121    /// Gets a mutable reference to the underlying writer.
1122    ///
1123    /// It is inadvisable to directly write to the underlying writer.
1124    pub fn get_mut(&mut self) -> &mut W {
1125        &mut self.writer
1126    }
1127
1128    /// Flush the underlying writer.
1129    ///
1130    /// Both the BufWriter and the underlying writer are flushed.
1131    pub fn flush(&mut self) -> Result<(), ArrowError> {
1132        self.writer.flush()?;
1133        Ok(())
1134    }
1135
1136    /// Unwraps the underlying writer.
1137    ///
1138    /// The writer is flushed and the FileWriter is finished before returning.
1139    ///
1140    /// # Errors
1141    ///
1142    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1143    /// or while flushing the writer.
1144    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1145        if !self.finished {
1146            // `finish` flushes the writer.
1147            self.finish()?;
1148        }
1149        Ok(self.writer)
1150    }
1151}
1152
1153impl<W: Write> RecordBatchWriter for FileWriter<W> {
1154    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1155        self.write(batch)
1156    }
1157
1158    fn close(mut self) -> Result<(), ArrowError> {
1159        self.finish()
1160    }
1161}
1162
1163/// Arrow Stream Writer
1164///
1165/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1166///
1167/// # See Also
1168///
1169/// * [`FileWriter`] for writing IPC Files
1170///
1171/// # Example
1172/// ```
1173/// # use arrow_array::record_batch;
1174/// # use arrow_ipc::writer::StreamWriter;
1175/// # let mut stream = vec![]; // mimic a stream for the example
1176/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1177/// // create a new writer, the schema must be known in advance
1178/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1179/// // write each batch to the underlying stream
1180/// writer.write(&batch).unwrap();
1181/// // When all batches are written, call finish to flush all buffers
1182/// writer.finish().unwrap();
1183/// ```
1184///
1185/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1186pub struct StreamWriter<W> {
1187    /// The object to write to
1188    writer: W,
1189    /// IPC write options
1190    write_options: IpcWriteOptions,
1191    /// Whether the writer footer has been written, and the writer is finished
1192    finished: bool,
1193    /// Keeps track of dictionaries that have been written
1194    dictionary_tracker: DictionaryTracker,
1195
1196    data_gen: IpcDataGenerator,
1197}
1198
1199impl<W: Write> StreamWriter<BufWriter<W>> {
1200    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1201    ///
1202    /// See [`StreamWriter::try_new`] for an unbuffered version.
1203    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1204        Self::try_new(BufWriter::new(writer), schema)
1205    }
1206}
1207
1208impl<W: Write> StreamWriter<W> {
1209    /// Try to create a new writer, with the schema written as part of the header.
1210    ///
1211    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1212    ///
1213    /// # Errors
1214    ///
1215    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1216    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1217        let write_options = IpcWriteOptions::default();
1218        Self::try_new_with_options(writer, schema, write_options)
1219    }
1220
1221    /// Try to create a new writer with [`IpcWriteOptions`].
1222    ///
1223    /// # Errors
1224    ///
1225    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1226    pub fn try_new_with_options(
1227        mut writer: W,
1228        schema: &Schema,
1229        write_options: IpcWriteOptions,
1230    ) -> Result<Self, ArrowError> {
1231        let data_gen = IpcDataGenerator::default();
1232        #[allow(deprecated)]
1233        let preserve_dict_id = write_options.preserve_dict_id;
1234        #[allow(deprecated)]
1235        let mut dictionary_tracker =
1236            DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id);
1237
1238        // write the schema, set the written bytes to the schema
1239        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1240            schema,
1241            &mut dictionary_tracker,
1242            &write_options,
1243        );
1244        write_message(&mut writer, encoded_message, &write_options)?;
1245        Ok(Self {
1246            writer,
1247            write_options,
1248            finished: false,
1249            dictionary_tracker,
1250            data_gen,
1251        })
1252    }
1253
1254    /// Write a record batch to the stream
1255    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1256        if self.finished {
1257            return Err(ArrowError::IpcError(
1258                "Cannot write record batch to stream writer as it is closed".to_string(),
1259            ));
1260        }
1261
1262        let (encoded_dictionaries, encoded_message) = self
1263            .data_gen
1264            .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1265            .expect("StreamWriter is configured to not error on dictionary replacement");
1266
1267        for encoded_dictionary in encoded_dictionaries {
1268            write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1269        }
1270
1271        write_message(&mut self.writer, encoded_message, &self.write_options)?;
1272        Ok(())
1273    }
1274
1275    /// Write continuation bytes, and mark the stream as done
1276    pub fn finish(&mut self) -> Result<(), ArrowError> {
1277        if self.finished {
1278            return Err(ArrowError::IpcError(
1279                "Cannot write footer to stream writer as it is closed".to_string(),
1280            ));
1281        }
1282
1283        write_continuation(&mut self.writer, &self.write_options, 0)?;
1284
1285        self.finished = true;
1286
1287        Ok(())
1288    }
1289
1290    /// Gets a reference to the underlying writer.
1291    pub fn get_ref(&self) -> &W {
1292        &self.writer
1293    }
1294
1295    /// Gets a mutable reference to the underlying writer.
1296    ///
1297    /// It is inadvisable to directly write to the underlying writer.
1298    pub fn get_mut(&mut self) -> &mut W {
1299        &mut self.writer
1300    }
1301
1302    /// Flush the underlying writer.
1303    ///
1304    /// Both the BufWriter and the underlying writer are flushed.
1305    pub fn flush(&mut self) -> Result<(), ArrowError> {
1306        self.writer.flush()?;
1307        Ok(())
1308    }
1309
1310    /// Unwraps the the underlying writer.
1311    ///
1312    /// The writer is flushed and the StreamWriter is finished before returning.
1313    ///
1314    /// # Errors
1315    ///
1316    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1317    /// or while flushing the writer.
1318    ///
1319    /// # Example
1320    ///
1321    /// ```
1322    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1323    /// # use arrow_ipc::MetadataVersion;
1324    /// # use arrow_schema::{ArrowError, Schema};
1325    /// # fn main() -> Result<(), ArrowError> {
1326    /// // The result we expect from an empty schema
1327    /// let expected = vec![
1328    ///     255, 255, 255, 255,  48,   0,   0,   0,
1329    ///      16,   0,   0,   0,   0,   0,  10,   0,
1330    ///      12,   0,  10,   0,   9,   0,   4,   0,
1331    ///      10,   0,   0,   0,  16,   0,   0,   0,
1332    ///       0,   1,   4,   0,   8,   0,   8,   0,
1333    ///       0,   0,   4,   0,   8,   0,   0,   0,
1334    ///       4,   0,   0,   0,   0,   0,   0,   0,
1335    ///     255, 255, 255, 255,   0,   0,   0,   0
1336    /// ];
1337    ///
1338    /// let schema = Schema::empty();
1339    /// let buffer: Vec<u8> = Vec::new();
1340    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1341    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1342    ///
1343    /// assert_eq!(stream_writer.into_inner()?, expected);
1344    /// # Ok(())
1345    /// # }
1346    /// ```
1347    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1348        if !self.finished {
1349            // `finish` flushes.
1350            self.finish()?;
1351        }
1352        Ok(self.writer)
1353    }
1354}
1355
1356impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1357    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1358        self.write(batch)
1359    }
1360
1361    fn close(mut self) -> Result<(), ArrowError> {
1362        self.finish()
1363    }
1364}
1365
1366/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1367pub struct EncodedData {
1368    /// An encoded crate::Message
1369    pub ipc_message: Vec<u8>,
1370    /// Arrow buffers to be written, should be an empty vec for schema messages
1371    pub arrow_data: Vec<u8>,
1372}
1373/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1374pub fn write_message<W: Write>(
1375    mut writer: W,
1376    encoded: EncodedData,
1377    write_options: &IpcWriteOptions,
1378) -> Result<(usize, usize), ArrowError> {
1379    let arrow_data_len = encoded.arrow_data.len();
1380    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1381        return Err(ArrowError::MemoryError(
1382            "Arrow data not aligned".to_string(),
1383        ));
1384    }
1385
1386    let a = usize::from(write_options.alignment - 1);
1387    let buffer = encoded.ipc_message;
1388    let flatbuf_size = buffer.len();
1389    let prefix_size = if write_options.write_legacy_ipc_format {
1390        4
1391    } else {
1392        8
1393    };
1394    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1395    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1396
1397    write_continuation(
1398        &mut writer,
1399        write_options,
1400        (aligned_size - prefix_size) as i32,
1401    )?;
1402
1403    // write the flatbuf
1404    if flatbuf_size > 0 {
1405        writer.write_all(&buffer)?;
1406    }
1407    // write padding
1408    writer.write_all(&PADDING[..padding_bytes])?;
1409
1410    // write arrow data
1411    let body_len = if arrow_data_len > 0 {
1412        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1413    } else {
1414        0
1415    };
1416
1417    Ok((aligned_size, body_len))
1418}
1419
1420fn write_body_buffers<W: Write>(
1421    mut writer: W,
1422    data: &[u8],
1423    alignment: u8,
1424) -> Result<usize, ArrowError> {
1425    let len = data.len();
1426    let pad_len = pad_to_alignment(alignment, len);
1427    let total_len = len + pad_len;
1428
1429    // write body buffer
1430    writer.write_all(data)?;
1431    if pad_len > 0 {
1432        writer.write_all(&PADDING[..pad_len])?;
1433    }
1434
1435    writer.flush()?;
1436    Ok(total_len)
1437}
1438
1439/// Write a record batch to the writer, writing the message size before the message
1440/// if the record batch is being written to a stream
1441fn write_continuation<W: Write>(
1442    mut writer: W,
1443    write_options: &IpcWriteOptions,
1444    total_len: i32,
1445) -> Result<usize, ArrowError> {
1446    let mut written = 8;
1447
1448    // the version of the writer determines whether continuation markers should be added
1449    match write_options.metadata_version {
1450        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1451            unreachable!("Options with the metadata version cannot be created")
1452        }
1453        crate::MetadataVersion::V4 => {
1454            if !write_options.write_legacy_ipc_format {
1455                // v0.15.0 format
1456                writer.write_all(&CONTINUATION_MARKER)?;
1457                written = 4;
1458            }
1459            writer.write_all(&total_len.to_le_bytes()[..])?;
1460        }
1461        crate::MetadataVersion::V5 => {
1462            // write continuation marker and message length
1463            writer.write_all(&CONTINUATION_MARKER)?;
1464            writer.write_all(&total_len.to_le_bytes()[..])?;
1465        }
1466        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1467    };
1468
1469    writer.flush()?;
1470
1471    Ok(written)
1472}
1473
1474/// In V4, null types have no validity bitmap
1475/// In V5 and later, null and union types have no validity bitmap
1476/// Run end encoded type has no validity bitmap.
1477fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1478    if write_options.metadata_version < crate::MetadataVersion::V5 {
1479        !matches!(data_type, DataType::Null)
1480    } else {
1481        !matches!(
1482            data_type,
1483            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1484        )
1485    }
1486}
1487
1488/// Whether to truncate the buffer
1489#[inline]
1490fn buffer_need_truncate(
1491    array_offset: usize,
1492    buffer: &Buffer,
1493    spec: &BufferSpec,
1494    min_length: usize,
1495) -> bool {
1496    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1497}
1498
1499/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1500#[inline]
1501fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1502    match spec {
1503        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1504        _ => 0,
1505    }
1506}
1507
1508/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1509/// original start offset and length for use in slicing child data.
1510fn reencode_offsets<O: OffsetSizeTrait>(
1511    offsets: &Buffer,
1512    data: &ArrayData,
1513) -> (Buffer, usize, usize) {
1514    let offsets_slice: &[O] = offsets.typed_data::<O>();
1515    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1516
1517    let start_offset = offset_slice.first().unwrap();
1518    let end_offset = offset_slice.last().unwrap();
1519
1520    let offsets = match start_offset.as_usize() {
1521        0 => {
1522            let size = size_of::<O>();
1523            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1524        }
1525        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1526    };
1527
1528    let start_offset = start_offset.as_usize();
1529    let end_offset = end_offset.as_usize();
1530
1531    (offsets, start_offset, end_offset - start_offset)
1532}
1533
1534/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1535///
1536/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1537/// slicing the values buffer as appropriate. This helps reduce the encoded
1538/// size of sliced arrays, as values that have been sliced away are not encoded
1539fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1540    if data.is_empty() {
1541        return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1542    }
1543
1544    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1545    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1546    (offsets, values)
1547}
1548
1549/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1550/// of a values buffer.
1551fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1552    if data.is_empty() {
1553        return (
1554            MutableBuffer::new(0).into(),
1555            data.child_data()[0].slice(0, 0),
1556        );
1557    }
1558
1559    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1560    let child_data = data.child_data()[0].slice(original_start_offset, len);
1561    (offsets, child_data)
1562}
1563
1564/// Write array data to a vector of bytes
1565#[allow(clippy::too_many_arguments)]
1566fn write_array_data(
1567    array_data: &ArrayData,
1568    buffers: &mut Vec<crate::Buffer>,
1569    arrow_data: &mut Vec<u8>,
1570    nodes: &mut Vec<crate::FieldNode>,
1571    offset: i64,
1572    num_rows: usize,
1573    null_count: usize,
1574    compression_codec: Option<CompressionCodec>,
1575    write_options: &IpcWriteOptions,
1576) -> Result<i64, ArrowError> {
1577    let mut offset = offset;
1578    if !matches!(array_data.data_type(), DataType::Null) {
1579        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1580    } else {
1581        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1582        // where null_count is always 0.
1583        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1584    }
1585    if has_validity_bitmap(array_data.data_type(), write_options) {
1586        // write null buffer if exists
1587        let null_buffer = match array_data.nulls() {
1588            None => {
1589                // create a buffer and fill it with valid bits
1590                let num_bytes = bit_util::ceil(num_rows, 8);
1591                let buffer = MutableBuffer::new(num_bytes);
1592                let buffer = buffer.with_bitset(num_bytes, true);
1593                buffer.into()
1594            }
1595            Some(buffer) => buffer.inner().sliced(),
1596        };
1597
1598        offset = write_buffer(
1599            null_buffer.as_slice(),
1600            buffers,
1601            arrow_data,
1602            offset,
1603            compression_codec,
1604            write_options.alignment,
1605        )?;
1606    }
1607
1608    let data_type = array_data.data_type();
1609    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1610        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1611        for buffer in [offsets, values] {
1612            offset = write_buffer(
1613                buffer.as_slice(),
1614                buffers,
1615                arrow_data,
1616                offset,
1617                compression_codec,
1618                write_options.alignment,
1619            )?;
1620        }
1621    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1622        // Slicing the views buffer is safe and easy,
1623        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1624        //
1625        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
1626        // If users wants to "compact" the arrays prior to sending them over IPC,
1627        // they should consider the gc API suggested in #5513
1628        for buffer in array_data.buffers() {
1629            offset = write_buffer(
1630                buffer.as_slice(),
1631                buffers,
1632                arrow_data,
1633                offset,
1634                compression_codec,
1635                write_options.alignment,
1636            )?;
1637        }
1638    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1639        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1640        for buffer in [offsets, values] {
1641            offset = write_buffer(
1642                buffer.as_slice(),
1643                buffers,
1644                arrow_data,
1645                offset,
1646                compression_codec,
1647                write_options.alignment,
1648            )?;
1649        }
1650    } else if DataType::is_numeric(data_type)
1651        || DataType::is_temporal(data_type)
1652        || matches!(
1653            array_data.data_type(),
1654            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1655        )
1656    {
1657        // Truncate values
1658        assert_eq!(array_data.buffers().len(), 1);
1659
1660        let buffer = &array_data.buffers()[0];
1661        let layout = layout(data_type);
1662        let spec = &layout.buffers[0];
1663
1664        let byte_width = get_buffer_element_width(spec);
1665        let min_length = array_data.len() * byte_width;
1666        let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1667            let byte_offset = array_data.offset() * byte_width;
1668            let buffer_length = min(min_length, buffer.len() - byte_offset);
1669            &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1670        } else {
1671            buffer.as_slice()
1672        };
1673        offset = write_buffer(
1674            buffer_slice,
1675            buffers,
1676            arrow_data,
1677            offset,
1678            compression_codec,
1679            write_options.alignment,
1680        )?;
1681    } else if matches!(data_type, DataType::Boolean) {
1682        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
1683        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
1684        assert_eq!(array_data.buffers().len(), 1);
1685
1686        let buffer = &array_data.buffers()[0];
1687        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1688        offset = write_buffer(
1689            &buffer,
1690            buffers,
1691            arrow_data,
1692            offset,
1693            compression_codec,
1694            write_options.alignment,
1695        )?;
1696    } else if matches!(
1697        data_type,
1698        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1699    ) {
1700        assert_eq!(array_data.buffers().len(), 1);
1701        assert_eq!(array_data.child_data().len(), 1);
1702
1703        // Truncate offsets and the child data to avoid writing unnecessary data
1704        let (offsets, sliced_child_data) = match data_type {
1705            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1706            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1707            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1708            _ => unreachable!(),
1709        };
1710        offset = write_buffer(
1711            offsets.as_slice(),
1712            buffers,
1713            arrow_data,
1714            offset,
1715            compression_codec,
1716            write_options.alignment,
1717        )?;
1718        offset = write_array_data(
1719            &sliced_child_data,
1720            buffers,
1721            arrow_data,
1722            nodes,
1723            offset,
1724            sliced_child_data.len(),
1725            sliced_child_data.null_count(),
1726            compression_codec,
1727            write_options,
1728        )?;
1729        return Ok(offset);
1730    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1731        assert_eq!(array_data.child_data().len(), 1);
1732        let fixed_size = *fixed_size as usize;
1733
1734        let child_offset = array_data.offset() * fixed_size;
1735        let child_length = array_data.len() * fixed_size;
1736        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1737
1738        offset = write_array_data(
1739            &child_data,
1740            buffers,
1741            arrow_data,
1742            nodes,
1743            offset,
1744            child_data.len(),
1745            child_data.null_count(),
1746            compression_codec,
1747            write_options,
1748        )?;
1749        return Ok(offset);
1750    } else {
1751        for buffer in array_data.buffers() {
1752            offset = write_buffer(
1753                buffer,
1754                buffers,
1755                arrow_data,
1756                offset,
1757                compression_codec,
1758                write_options.alignment,
1759            )?;
1760        }
1761    }
1762
1763    match array_data.data_type() {
1764        DataType::Dictionary(_, _) => {}
1765        DataType::RunEndEncoded(_, _) => {
1766            // unslice the run encoded array.
1767            let arr = unslice_run_array(array_data.clone())?;
1768            // recursively write out nested structures
1769            for data_ref in arr.child_data() {
1770                // write the nested data (e.g list data)
1771                offset = write_array_data(
1772                    data_ref,
1773                    buffers,
1774                    arrow_data,
1775                    nodes,
1776                    offset,
1777                    data_ref.len(),
1778                    data_ref.null_count(),
1779                    compression_codec,
1780                    write_options,
1781                )?;
1782            }
1783        }
1784        _ => {
1785            // recursively write out nested structures
1786            for data_ref in array_data.child_data() {
1787                // write the nested data (e.g list data)
1788                offset = write_array_data(
1789                    data_ref,
1790                    buffers,
1791                    arrow_data,
1792                    nodes,
1793                    offset,
1794                    data_ref.len(),
1795                    data_ref.null_count(),
1796                    compression_codec,
1797                    write_options,
1798                )?;
1799            }
1800        }
1801    }
1802    Ok(offset)
1803}
1804
1805/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
1806/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
1807///
1808///
1809/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
1810/// Each constituent buffer is first compressed with the indicated
1811/// compressor, and then written with the uncompressed length in the first 8
1812/// bytes as a 64-bit little-endian signed integer followed by the compressed
1813/// buffer bytes (and then padding as required by the protocol). The
1814/// uncompressed length may be set to -1 to indicate that the data that
1815/// follows is not compressed, which can be useful for cases where
1816/// compression does not yield appreciable savings.
1817fn write_buffer(
1818    buffer: &[u8],                    // input
1819    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
1820    arrow_data: &mut Vec<u8>,         // output stream
1821    offset: i64,                      // current output stream offset
1822    compression_codec: Option<CompressionCodec>,
1823    alignment: u8,
1824) -> Result<i64, ArrowError> {
1825    let len: i64 = match compression_codec {
1826        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1827        None => {
1828            arrow_data.extend_from_slice(buffer);
1829            buffer.len()
1830        }
1831    }
1832    .try_into()
1833    .map_err(|e| {
1834        ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1835    })?;
1836
1837    // make new index entry
1838    buffers.push(crate::Buffer::new(offset, len));
1839    // padding and make offset aligned
1840    let pad_len = pad_to_alignment(alignment, len as usize);
1841    arrow_data.extend_from_slice(&PADDING[..pad_len]);
1842
1843    Ok(offset + len + (pad_len as i64))
1844}
1845
1846const PADDING: [u8; 64] = [0; 64];
1847
1848/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
1849#[inline]
1850fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1851    let a = usize::from(alignment - 1);
1852    ((len + a) & !a) - len
1853}
1854
1855#[cfg(test)]
1856mod tests {
1857    use std::io::Cursor;
1858    use std::io::Seek;
1859
1860    use arrow_array::builder::FixedSizeListBuilder;
1861    use arrow_array::builder::Float32Builder;
1862    use arrow_array::builder::Int64Builder;
1863    use arrow_array::builder::MapBuilder;
1864    use arrow_array::builder::UnionBuilder;
1865    use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
1866    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1867    use arrow_array::types::*;
1868    use arrow_buffer::ScalarBuffer;
1869
1870    use crate::convert::fb_to_schema;
1871    use crate::reader::*;
1872    use crate::root_as_footer;
1873    use crate::MetadataVersion;
1874
1875    use super::*;
1876
1877    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1878        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1879        writer.write(rb).unwrap();
1880        writer.finish().unwrap();
1881        writer.into_inner().unwrap()
1882    }
1883
1884    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1885        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1886        reader.next().unwrap().unwrap()
1887    }
1888
1889    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1890        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
1891        // without needing to construct a giant array to spill over the 64-byte default alignment
1892        // boundary.
1893        const IPC_ALIGNMENT: usize = 8;
1894
1895        let mut stream_writer = StreamWriter::try_new_with_options(
1896            vec![],
1897            record.schema_ref(),
1898            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1899        )
1900        .unwrap();
1901        stream_writer.write(record).unwrap();
1902        stream_writer.finish().unwrap();
1903        stream_writer.into_inner().unwrap()
1904    }
1905
1906    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
1907        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
1908        stream_reader.next().unwrap().unwrap()
1909    }
1910
1911    #[test]
1912    #[cfg(feature = "lz4")]
1913    fn test_write_empty_record_batch_lz4_compression() {
1914        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1915        let values: Vec<Option<i32>> = vec![];
1916        let array = Int32Array::from(values);
1917        let record_batch =
1918            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1919
1920        let mut file = tempfile::tempfile().unwrap();
1921
1922        {
1923            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1924                .unwrap()
1925                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1926                .unwrap();
1927
1928            let mut writer =
1929                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1930            writer.write(&record_batch).unwrap();
1931            writer.finish().unwrap();
1932        }
1933        file.rewind().unwrap();
1934        {
1935            // read file
1936            let reader = FileReader::try_new(file, None).unwrap();
1937            for read_batch in reader {
1938                read_batch
1939                    .unwrap()
1940                    .columns()
1941                    .iter()
1942                    .zip(record_batch.columns())
1943                    .for_each(|(a, b)| {
1944                        assert_eq!(a.data_type(), b.data_type());
1945                        assert_eq!(a.len(), b.len());
1946                        assert_eq!(a.null_count(), b.null_count());
1947                    });
1948            }
1949        }
1950    }
1951
1952    #[test]
1953    #[cfg(feature = "lz4")]
1954    fn test_write_file_with_lz4_compression() {
1955        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1956        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1957        let array = Int32Array::from(values);
1958        let record_batch =
1959            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1960
1961        let mut file = tempfile::tempfile().unwrap();
1962        {
1963            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1964                .unwrap()
1965                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1966                .unwrap();
1967
1968            let mut writer =
1969                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1970            writer.write(&record_batch).unwrap();
1971            writer.finish().unwrap();
1972        }
1973        file.rewind().unwrap();
1974        {
1975            // read file
1976            let reader = FileReader::try_new(file, None).unwrap();
1977            for read_batch in reader {
1978                read_batch
1979                    .unwrap()
1980                    .columns()
1981                    .iter()
1982                    .zip(record_batch.columns())
1983                    .for_each(|(a, b)| {
1984                        assert_eq!(a.data_type(), b.data_type());
1985                        assert_eq!(a.len(), b.len());
1986                        assert_eq!(a.null_count(), b.null_count());
1987                    });
1988            }
1989        }
1990    }
1991
1992    #[test]
1993    #[cfg(feature = "zstd")]
1994    fn test_write_file_with_zstd_compression() {
1995        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1996        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1997        let array = Int32Array::from(values);
1998        let record_batch =
1999            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2000        let mut file = tempfile::tempfile().unwrap();
2001        {
2002            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2003                .unwrap()
2004                .try_with_compression(Some(crate::CompressionType::ZSTD))
2005                .unwrap();
2006
2007            let mut writer =
2008                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2009            writer.write(&record_batch).unwrap();
2010            writer.finish().unwrap();
2011        }
2012        file.rewind().unwrap();
2013        {
2014            // read file
2015            let reader = FileReader::try_new(file, None).unwrap();
2016            for read_batch in reader {
2017                read_batch
2018                    .unwrap()
2019                    .columns()
2020                    .iter()
2021                    .zip(record_batch.columns())
2022                    .for_each(|(a, b)| {
2023                        assert_eq!(a.data_type(), b.data_type());
2024                        assert_eq!(a.len(), b.len());
2025                        assert_eq!(a.null_count(), b.null_count());
2026                    });
2027            }
2028        }
2029    }
2030
2031    #[test]
2032    fn test_write_file() {
2033        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2034        let values: Vec<Option<u32>> = vec![
2035            Some(999),
2036            None,
2037            Some(235),
2038            Some(123),
2039            None,
2040            None,
2041            None,
2042            None,
2043            None,
2044        ];
2045        let array1 = UInt32Array::from(values);
2046        let batch =
2047            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2048                .unwrap();
2049        let mut file = tempfile::tempfile().unwrap();
2050        {
2051            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2052
2053            writer.write(&batch).unwrap();
2054            writer.finish().unwrap();
2055        }
2056        file.rewind().unwrap();
2057
2058        {
2059            let mut reader = FileReader::try_new(file, None).unwrap();
2060            while let Some(Ok(read_batch)) = reader.next() {
2061                read_batch
2062                    .columns()
2063                    .iter()
2064                    .zip(batch.columns())
2065                    .for_each(|(a, b)| {
2066                        assert_eq!(a.data_type(), b.data_type());
2067                        assert_eq!(a.len(), b.len());
2068                        assert_eq!(a.null_count(), b.null_count());
2069                    });
2070            }
2071        }
2072    }
2073
2074    fn write_null_file(options: IpcWriteOptions) {
2075        let schema = Schema::new(vec![
2076            Field::new("nulls", DataType::Null, true),
2077            Field::new("int32s", DataType::Int32, false),
2078            Field::new("nulls2", DataType::Null, true),
2079            Field::new("f64s", DataType::Float64, false),
2080        ]);
2081        let array1 = NullArray::new(32);
2082        let array2 = Int32Array::from(vec![1; 32]);
2083        let array3 = NullArray::new(32);
2084        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2085        let batch = RecordBatch::try_new(
2086            Arc::new(schema.clone()),
2087            vec![
2088                Arc::new(array1) as ArrayRef,
2089                Arc::new(array2) as ArrayRef,
2090                Arc::new(array3) as ArrayRef,
2091                Arc::new(array4) as ArrayRef,
2092            ],
2093        )
2094        .unwrap();
2095        let mut file = tempfile::tempfile().unwrap();
2096        {
2097            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2098
2099            writer.write(&batch).unwrap();
2100            writer.finish().unwrap();
2101        }
2102
2103        file.rewind().unwrap();
2104
2105        {
2106            let reader = FileReader::try_new(file, None).unwrap();
2107            reader.for_each(|maybe_batch| {
2108                maybe_batch
2109                    .unwrap()
2110                    .columns()
2111                    .iter()
2112                    .zip(batch.columns())
2113                    .for_each(|(a, b)| {
2114                        assert_eq!(a.data_type(), b.data_type());
2115                        assert_eq!(a.len(), b.len());
2116                        assert_eq!(a.null_count(), b.null_count());
2117                    });
2118            });
2119        }
2120    }
2121    #[test]
2122    fn test_write_null_file_v4() {
2123        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2124        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2125        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2126        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2127    }
2128
2129    #[test]
2130    fn test_write_null_file_v5() {
2131        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2132        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2133    }
2134
2135    #[test]
2136    fn track_union_nested_dict() {
2137        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2138
2139        let array = Arc::new(inner) as ArrayRef;
2140
2141        // Dict field with id 2
2142        #[allow(deprecated)]
2143        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false);
2144        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2145
2146        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2147        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2148
2149        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2150
2151        let schema = Arc::new(Schema::new(vec![Field::new(
2152            "union",
2153            union.data_type().clone(),
2154            false,
2155        )]));
2156
2157        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2158
2159        let gen = IpcDataGenerator {};
2160        #[allow(deprecated)]
2161        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2162        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2163            .unwrap();
2164
2165        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2166        // so we expect the dict will be keyed to 0
2167        assert!(dict_tracker.written.contains_key(&2));
2168    }
2169
2170    #[test]
2171    fn track_struct_nested_dict() {
2172        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2173
2174        let array = Arc::new(inner) as ArrayRef;
2175
2176        // Dict field with id 2
2177        #[allow(deprecated)]
2178        let dctfield = Arc::new(Field::new_dict(
2179            "dict",
2180            array.data_type().clone(),
2181            false,
2182            2,
2183            false,
2184        ));
2185
2186        let s = StructArray::from(vec![(dctfield, array)]);
2187        let struct_array = Arc::new(s) as ArrayRef;
2188
2189        let schema = Arc::new(Schema::new(vec![Field::new(
2190            "struct",
2191            struct_array.data_type().clone(),
2192            false,
2193        )]));
2194
2195        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2196
2197        let gen = IpcDataGenerator {};
2198        #[allow(deprecated)]
2199        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2200        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2201            .unwrap();
2202
2203        assert!(dict_tracker.written.contains_key(&2));
2204    }
2205
2206    fn write_union_file(options: IpcWriteOptions) {
2207        let schema = Schema::new(vec![Field::new_union(
2208            "union",
2209            vec![0, 1],
2210            vec![
2211                Field::new("a", DataType::Int32, false),
2212                Field::new("c", DataType::Float64, false),
2213            ],
2214            UnionMode::Sparse,
2215        )]);
2216        let mut builder = UnionBuilder::with_capacity_sparse(5);
2217        builder.append::<Int32Type>("a", 1).unwrap();
2218        builder.append_null::<Int32Type>("a").unwrap();
2219        builder.append::<Float64Type>("c", 3.0).unwrap();
2220        builder.append_null::<Float64Type>("c").unwrap();
2221        builder.append::<Int32Type>("a", 4).unwrap();
2222        let union = builder.build().unwrap();
2223
2224        let batch =
2225            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2226                .unwrap();
2227
2228        let mut file = tempfile::tempfile().unwrap();
2229        {
2230            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2231
2232            writer.write(&batch).unwrap();
2233            writer.finish().unwrap();
2234        }
2235        file.rewind().unwrap();
2236
2237        {
2238            let reader = FileReader::try_new(file, None).unwrap();
2239            reader.for_each(|maybe_batch| {
2240                maybe_batch
2241                    .unwrap()
2242                    .columns()
2243                    .iter()
2244                    .zip(batch.columns())
2245                    .for_each(|(a, b)| {
2246                        assert_eq!(a.data_type(), b.data_type());
2247                        assert_eq!(a.len(), b.len());
2248                        assert_eq!(a.null_count(), b.null_count());
2249                    });
2250            });
2251        }
2252    }
2253
2254    #[test]
2255    fn test_write_union_file_v4_v5() {
2256        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2257        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2258    }
2259
2260    #[test]
2261    fn test_write_view_types() {
2262        const LONG_TEST_STRING: &str =
2263            "This is a long string to make sure binary view array handles it";
2264        let schema = Schema::new(vec![
2265            Field::new("field1", DataType::BinaryView, true),
2266            Field::new("field2", DataType::Utf8View, true),
2267        ]);
2268        let values: Vec<Option<&[u8]>> = vec![
2269            Some(b"foo"),
2270            Some(b"bar"),
2271            Some(LONG_TEST_STRING.as_bytes()),
2272        ];
2273        let binary_array = BinaryViewArray::from_iter(values);
2274        let utf8_array =
2275            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2276        let record_batch = RecordBatch::try_new(
2277            Arc::new(schema.clone()),
2278            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2279        )
2280        .unwrap();
2281
2282        let mut file = tempfile::tempfile().unwrap();
2283        {
2284            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2285            writer.write(&record_batch).unwrap();
2286            writer.finish().unwrap();
2287        }
2288        file.rewind().unwrap();
2289        {
2290            let mut reader = FileReader::try_new(&file, None).unwrap();
2291            let read_batch = reader.next().unwrap().unwrap();
2292            read_batch
2293                .columns()
2294                .iter()
2295                .zip(record_batch.columns())
2296                .for_each(|(a, b)| {
2297                    assert_eq!(a, b);
2298                });
2299        }
2300        file.rewind().unwrap();
2301        {
2302            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2303            let read_batch = reader.next().unwrap().unwrap();
2304            assert_eq!(read_batch.num_columns(), 1);
2305            let read_array = read_batch.column(0);
2306            let write_array = record_batch.column(0);
2307            assert_eq!(read_array, write_array);
2308        }
2309    }
2310
2311    #[test]
2312    fn truncate_ipc_record_batch() {
2313        fn create_batch(rows: usize) -> RecordBatch {
2314            let schema = Schema::new(vec![
2315                Field::new("a", DataType::Int32, false),
2316                Field::new("b", DataType::Utf8, false),
2317            ]);
2318
2319            let a = Int32Array::from_iter_values(0..rows as i32);
2320            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2321
2322            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2323        }
2324
2325        let big_record_batch = create_batch(65536);
2326
2327        let length = 5;
2328        let small_record_batch = create_batch(length);
2329
2330        let offset = 2;
2331        let record_batch_slice = big_record_batch.slice(offset, length);
2332        assert!(
2333            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2334        );
2335        assert_eq!(
2336            serialize_stream(&small_record_batch).len(),
2337            serialize_stream(&record_batch_slice).len()
2338        );
2339
2340        assert_eq!(
2341            deserialize_stream(serialize_stream(&record_batch_slice)),
2342            record_batch_slice
2343        );
2344    }
2345
2346    #[test]
2347    fn truncate_ipc_record_batch_with_nulls() {
2348        fn create_batch() -> RecordBatch {
2349            let schema = Schema::new(vec![
2350                Field::new("a", DataType::Int32, true),
2351                Field::new("b", DataType::Utf8, true),
2352            ]);
2353
2354            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2355            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2356
2357            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2358        }
2359
2360        let record_batch = create_batch();
2361        let record_batch_slice = record_batch.slice(1, 2);
2362        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2363
2364        assert!(
2365            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2366        );
2367
2368        assert!(deserialized_batch.column(0).is_null(0));
2369        assert!(deserialized_batch.column(0).is_valid(1));
2370        assert!(deserialized_batch.column(1).is_valid(0));
2371        assert!(deserialized_batch.column(1).is_valid(1));
2372
2373        assert_eq!(record_batch_slice, deserialized_batch);
2374    }
2375
2376    #[test]
2377    fn truncate_ipc_dictionary_array() {
2378        fn create_batch() -> RecordBatch {
2379            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2380                .into_iter()
2381                .collect();
2382            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2383
2384            let array = DictionaryArray::new(keys, Arc::new(values));
2385
2386            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2387
2388            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2389        }
2390
2391        let record_batch = create_batch();
2392        let record_batch_slice = record_batch.slice(1, 2);
2393        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2394
2395        assert!(
2396            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2397        );
2398
2399        assert!(deserialized_batch.column(0).is_valid(0));
2400        assert!(deserialized_batch.column(0).is_null(1));
2401
2402        assert_eq!(record_batch_slice, deserialized_batch);
2403    }
2404
2405    #[test]
2406    fn truncate_ipc_struct_array() {
2407        fn create_batch() -> RecordBatch {
2408            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2409                .into_iter()
2410                .collect();
2411            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2412
2413            let struct_array = StructArray::from(vec![
2414                (
2415                    Arc::new(Field::new("s", DataType::Utf8, true)),
2416                    Arc::new(strings) as ArrayRef,
2417                ),
2418                (
2419                    Arc::new(Field::new("c", DataType::Int32, true)),
2420                    Arc::new(ints) as ArrayRef,
2421                ),
2422            ]);
2423
2424            let schema = Schema::new(vec![Field::new(
2425                "struct_array",
2426                struct_array.data_type().clone(),
2427                true,
2428            )]);
2429
2430            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2431        }
2432
2433        let record_batch = create_batch();
2434        let record_batch_slice = record_batch.slice(1, 2);
2435        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2436
2437        assert!(
2438            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2439        );
2440
2441        let structs = deserialized_batch
2442            .column(0)
2443            .as_any()
2444            .downcast_ref::<StructArray>()
2445            .unwrap();
2446
2447        assert!(structs.column(0).is_null(0));
2448        assert!(structs.column(0).is_valid(1));
2449        assert!(structs.column(1).is_valid(0));
2450        assert!(structs.column(1).is_null(1));
2451        assert_eq!(record_batch_slice, deserialized_batch);
2452    }
2453
2454    #[test]
2455    fn truncate_ipc_string_array_with_all_empty_string() {
2456        fn create_batch() -> RecordBatch {
2457            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2458            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2459            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2460        }
2461
2462        let record_batch = create_batch();
2463        let record_batch_slice = record_batch.slice(0, 1);
2464        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2465
2466        assert!(
2467            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2468        );
2469        assert_eq!(record_batch_slice, deserialized_batch);
2470    }
2471
2472    #[test]
2473    fn test_stream_writer_writes_array_slice() {
2474        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2475        assert_eq!(
2476            vec![Some(1), Some(2), Some(3)],
2477            array.iter().collect::<Vec<_>>()
2478        );
2479
2480        let sliced = array.slice(1, 2);
2481        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2482
2483        let batch = RecordBatch::try_new(
2484            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2485            vec![Arc::new(sliced)],
2486        )
2487        .expect("new batch");
2488
2489        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2490        writer.write(&batch).expect("write");
2491        let outbuf = writer.into_inner().expect("inner");
2492
2493        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2494        let read_batch = reader.next().unwrap().expect("read batch");
2495
2496        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2497        assert_eq!(
2498            vec![Some(2), Some(3)],
2499            read_array.iter().collect::<Vec<_>>()
2500        );
2501    }
2502
2503    #[test]
2504    fn test_large_slice_uint32() {
2505        ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2506            if i % 2 == 0 {
2507                Some(i)
2508            } else {
2509                None
2510            }
2511        }))));
2512    }
2513
2514    #[test]
2515    fn test_large_slice_string() {
2516        let strings: Vec<_> = (0..8000)
2517            .map(|i| {
2518                if i % 2 == 0 {
2519                    Some(format!("value{}", i))
2520                } else {
2521                    None
2522                }
2523            })
2524            .collect();
2525
2526        ensure_roundtrip(Arc::new(StringArray::from(strings)));
2527    }
2528
2529    #[test]
2530    fn test_large_slice_string_list() {
2531        let mut ls = ListBuilder::new(StringBuilder::new());
2532
2533        let mut s = String::new();
2534        for row_number in 0..8000 {
2535            if row_number % 2 == 0 {
2536                for list_element in 0..1000 {
2537                    s.clear();
2538                    use std::fmt::Write;
2539                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2540                    ls.values().append_value(&s);
2541                }
2542                ls.append(true)
2543            } else {
2544                ls.append(false); // null
2545            }
2546        }
2547
2548        ensure_roundtrip(Arc::new(ls.finish()));
2549    }
2550
2551    #[test]
2552    fn test_large_slice_string_list_of_lists() {
2553        // The reason for the special test is to verify reencode_offsets which looks both at
2554        // the starting offset and the data offset.  So need a dataset where the starting_offset
2555        // is zero but the data offset is not.
2556        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2557
2558        for _ in 0..4000 {
2559            ls.values().append(true);
2560            ls.append(true)
2561        }
2562
2563        let mut s = String::new();
2564        for row_number in 0..4000 {
2565            if row_number % 2 == 0 {
2566                for list_element in 0..1000 {
2567                    s.clear();
2568                    use std::fmt::Write;
2569                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2570                    ls.values().values().append_value(&s);
2571                }
2572                ls.values().append(true);
2573                ls.append(true)
2574            } else {
2575                ls.append(false); // null
2576            }
2577        }
2578
2579        ensure_roundtrip(Arc::new(ls.finish()));
2580    }
2581
2582    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
2583    fn ensure_roundtrip(array: ArrayRef) {
2584        let num_rows = array.len();
2585        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2586        // take off the first element
2587        let sliced_batch = orig_batch.slice(1, num_rows - 1);
2588
2589        let schema = orig_batch.schema();
2590        let stream_data = {
2591            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2592            writer.write(&sliced_batch).unwrap();
2593            writer.into_inner().unwrap()
2594        };
2595        let read_batch = {
2596            let projection = None;
2597            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2598            reader
2599                .next()
2600                .expect("expect no errors reading batch")
2601                .expect("expect batch")
2602        };
2603        assert_eq!(sliced_batch, read_batch);
2604
2605        let file_data = {
2606            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2607            writer.write(&sliced_batch).unwrap();
2608            writer.into_inner().unwrap().into_inner().unwrap()
2609        };
2610        let read_batch = {
2611            let projection = None;
2612            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2613            reader
2614                .next()
2615                .expect("expect no errors reading batch")
2616                .expect("expect batch")
2617        };
2618        assert_eq!(sliced_batch, read_batch);
2619
2620        // TODO test file writer/reader
2621    }
2622
2623    #[test]
2624    fn encode_bools_slice() {
2625        // Test case for https://github.com/apache/arrow-rs/issues/3496
2626        assert_bool_roundtrip([true, false], 1, 1);
2627
2628        // slice somewhere in the middle
2629        assert_bool_roundtrip(
2630            [
2631                true, false, true, true, false, false, true, true, true, false, false, false, true,
2632                true, true, true, false, false, false, false, true, true, true, true, true, false,
2633                false, false, false, false,
2634            ],
2635            13,
2636            17,
2637        );
2638
2639        // start at byte boundary, end in the middle
2640        assert_bool_roundtrip(
2641            [
2642                true, false, true, true, false, false, true, true, true, false, false, false,
2643            ],
2644            8,
2645            2,
2646        );
2647
2648        // start and stop and byte boundary
2649        assert_bool_roundtrip(
2650            [
2651                true, false, true, true, false, false, true, true, true, false, false, false, true,
2652                true, true, true, true, false, false, false, false, false,
2653            ],
2654            8,
2655            8,
2656        );
2657    }
2658
2659    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2660        let val_bool_field = Field::new("val", DataType::Boolean, false);
2661
2662        let schema = Arc::new(Schema::new(vec![val_bool_field]));
2663
2664        let bools = BooleanArray::from(bools.to_vec());
2665
2666        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2667        let batch = batch.slice(offset, length);
2668
2669        let data = serialize_stream(&batch);
2670        let batch2 = deserialize_stream(data);
2671        assert_eq!(batch, batch2);
2672    }
2673
2674    #[test]
2675    fn test_run_array_unslice() {
2676        let total_len = 80;
2677        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2678        let repeats: Vec<usize> = vec![3, 4, 1, 2];
2679        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2680        for ix in 0_usize..32 {
2681            let repeat: usize = repeats[ix % repeats.len()];
2682            let val: Option<i32> = vals[ix % vals.len()];
2683            input_array.resize(input_array.len() + repeat, val);
2684        }
2685
2686        // Encode the input_array to run array
2687        let mut builder =
2688            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2689        builder.extend(input_array.iter().copied());
2690        let run_array = builder.finish();
2691
2692        // test for all slice lengths.
2693        for slice_len in 1..=total_len {
2694            // test for offset = 0, slice length = slice_len
2695            let sliced_run_array: RunArray<Int16Type> =
2696                run_array.slice(0, slice_len).into_data().into();
2697
2698            // Create unsliced run array.
2699            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2700            let typed = unsliced_run_array
2701                .downcast::<PrimitiveArray<Int32Type>>()
2702                .unwrap();
2703            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2704            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2705            assert_eq!(expected, actual);
2706
2707            // test for offset = total_len - slice_len, length = slice_len
2708            let sliced_run_array: RunArray<Int16Type> = run_array
2709                .slice(total_len - slice_len, slice_len)
2710                .into_data()
2711                .into();
2712
2713            // Create unsliced run array.
2714            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2715            let typed = unsliced_run_array
2716                .downcast::<PrimitiveArray<Int32Type>>()
2717                .unwrap();
2718            let expected: Vec<Option<i32>> = input_array
2719                .iter()
2720                .skip(total_len - slice_len)
2721                .copied()
2722                .collect();
2723            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2724            assert_eq!(expected, actual);
2725        }
2726    }
2727
2728    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2729        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2730
2731        for i in 0..100_000 {
2732            for value in [i, i, i] {
2733                ls.values().append_value(value);
2734            }
2735            ls.append(true)
2736        }
2737
2738        ls.finish()
2739    }
2740
2741    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2742        let mut ls =
2743            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2744
2745        for _i in 0..10_000 {
2746            for j in 0..10 {
2747                for value in [j, j, j, j] {
2748                    ls.values().values().append_value(value);
2749                }
2750                ls.values().append(true)
2751            }
2752            ls.append(true);
2753        }
2754
2755        ls.finish()
2756    }
2757
2758    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2759        let mut ls =
2760            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2761
2762        for _i in 0..999 {
2763            ls.values().append(true);
2764            ls.append(true);
2765        }
2766
2767        for j in 0..10 {
2768            for value in [j, j, j, j] {
2769                ls.values().values().append_value(value);
2770            }
2771            ls.values().append(true)
2772        }
2773        ls.append(true);
2774
2775        for i in 0..9_000 {
2776            for j in 0..10 {
2777                for value in [i + j, i + j, i + j, i + j] {
2778                    ls.values().values().append_value(value);
2779                }
2780                ls.values().append(true)
2781            }
2782            ls.append(true);
2783        }
2784
2785        ls.finish()
2786    }
2787
2788    fn generate_map_array_data() -> MapArray {
2789        let keys_builder = UInt32Builder::new();
2790        let values_builder = UInt32Builder::new();
2791
2792        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2793
2794        for i in 0..100_000 {
2795            for _j in 0..3 {
2796                builder.keys().append_value(i);
2797                builder.values().append_value(i * 2);
2798            }
2799            builder.append(true).unwrap();
2800        }
2801
2802        builder.finish()
2803    }
2804
2805    #[test]
2806    fn reencode_offsets_when_first_offset_is_not_zero() {
2807        let original_list = generate_list_data::<i32>();
2808        let original_data = original_list.into_data();
2809        let slice_data = original_data.slice(75, 7);
2810        let (new_offsets, original_start, length) =
2811            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2812        assert_eq!(
2813            vec![0, 3, 6, 9, 12, 15, 18, 21],
2814            new_offsets.typed_data::<i32>()
2815        );
2816        assert_eq!(225, original_start);
2817        assert_eq!(21, length);
2818    }
2819
2820    #[test]
2821    fn reencode_offsets_when_first_offset_is_zero() {
2822        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2823        // ls = [[], [35, 42]
2824        ls.append(true);
2825        ls.values().append_value(35);
2826        ls.values().append_value(42);
2827        ls.append(true);
2828        let original_list = ls.finish();
2829        let original_data = original_list.into_data();
2830
2831        let slice_data = original_data.slice(1, 1);
2832        let (new_offsets, original_start, length) =
2833            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2834        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2835        assert_eq!(0, original_start);
2836        assert_eq!(2, length);
2837    }
2838
2839    /// Ensure when serde full & sliced versions they are equal to original input.
2840    /// Also ensure serialized sliced version is significantly smaller than serialized full.
2841    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2842        // test both full and sliced versions
2843        let in_sliced = in_batch.slice(999, 1);
2844
2845        let bytes_batch = serialize_file(&in_batch);
2846        let bytes_sliced = serialize_file(&in_sliced);
2847
2848        // serializing 1 row should be significantly smaller than serializing 100,000
2849        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2850
2851        // ensure both are still valid and equal to originals
2852        let out_batch = deserialize_file(bytes_batch);
2853        assert_eq!(in_batch, out_batch);
2854
2855        let out_sliced = deserialize_file(bytes_sliced);
2856        assert_eq!(in_sliced, out_sliced);
2857    }
2858
2859    #[test]
2860    fn encode_lists() {
2861        let val_inner = Field::new_list_field(DataType::UInt32, true);
2862        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2863        let schema = Arc::new(Schema::new(vec![val_list_field]));
2864
2865        let values = Arc::new(generate_list_data::<i32>());
2866
2867        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2868        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2869    }
2870
2871    #[test]
2872    fn encode_empty_list() {
2873        let val_inner = Field::new_list_field(DataType::UInt32, true);
2874        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2875        let schema = Arc::new(Schema::new(vec![val_list_field]));
2876
2877        let values = Arc::new(generate_list_data::<i32>());
2878
2879        let in_batch = RecordBatch::try_new(schema, vec![values])
2880            .unwrap()
2881            .slice(999, 0);
2882        let out_batch = deserialize_file(serialize_file(&in_batch));
2883        assert_eq!(in_batch, out_batch);
2884    }
2885
2886    #[test]
2887    fn encode_large_lists() {
2888        let val_inner = Field::new_list_field(DataType::UInt32, true);
2889        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2890        let schema = Arc::new(Schema::new(vec![val_list_field]));
2891
2892        let values = Arc::new(generate_list_data::<i64>());
2893
2894        // ensure when serde full & sliced versions they are equal to original input
2895        // also ensure serialized sliced version is significantly smaller than serialized full
2896        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2897        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2898    }
2899
2900    #[test]
2901    fn encode_nested_lists() {
2902        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
2903        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
2904        let list_field = Field::new("val", DataType::List(inner_list_field), true);
2905        let schema = Arc::new(Schema::new(vec![list_field]));
2906
2907        let values = Arc::new(generate_nested_list_data::<i32>());
2908
2909        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2910        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2911    }
2912
2913    #[test]
2914    fn encode_nested_lists_starting_at_zero() {
2915        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
2916        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
2917        let list_field = Field::new("val", DataType::List(inner_list_field), true);
2918        let schema = Arc::new(Schema::new(vec![list_field]));
2919
2920        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
2921
2922        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2923        roundtrip_ensure_sliced_smaller(in_batch, 1);
2924    }
2925
2926    #[test]
2927    fn encode_map_array() {
2928        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
2929        let values = Arc::new(Field::new("values", DataType::UInt32, true));
2930        let map_field = Field::new_map("map", "entries", keys, values, false, true);
2931        let schema = Arc::new(Schema::new(vec![map_field]));
2932
2933        let values = Arc::new(generate_map_array_data());
2934
2935        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2936        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2937    }
2938
2939    #[test]
2940    fn test_decimal128_alignment16_is_sufficient() {
2941        const IPC_ALIGNMENT: usize = 16;
2942
2943        // Test a bunch of different dimensions to ensure alignment is never an issue.
2944        // For example, if we only test `num_cols = 1` then even with alignment 8 this
2945        // test would _happen_ to pass, even though for different dimensions like
2946        // `num_cols = 2` it would fail.
2947        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
2948            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
2949
2950            let mut fields = Vec::new();
2951            let mut arrays = Vec::new();
2952            for i in 0..num_cols {
2953                let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2954                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2955                fields.push(field);
2956                arrays.push(Arc::new(array) as Arc<dyn Array>);
2957            }
2958            let schema = Schema::new(fields);
2959            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2960
2961            let mut writer = FileWriter::try_new_with_options(
2962                Vec::new(),
2963                batch.schema_ref(),
2964                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2965            )
2966            .unwrap();
2967            writer.write(&batch).unwrap();
2968            writer.finish().unwrap();
2969
2970            let out: Vec<u8> = writer.into_inner().unwrap();
2971
2972            let buffer = Buffer::from_vec(out);
2973            let trailer_start = buffer.len() - 10;
2974            let footer_len =
2975                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2976            let footer =
2977                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2978
2979            let schema = fb_to_schema(footer.schema().unwrap());
2980
2981            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
2982            // for `read_record_batch` later on to read the data in a zero-copy manner.
2983            let decoder =
2984                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2985
2986            let batches = footer.recordBatches().unwrap();
2987
2988            let block = batches.get(0);
2989            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2990            let data = buffer.slice_with_length(block.offset() as _, block_len);
2991
2992            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
2993
2994            assert_eq!(batch, batch2);
2995        }
2996    }
2997
2998    #[test]
2999    fn test_decimal128_alignment8_is_unaligned() {
3000        const IPC_ALIGNMENT: usize = 8;
3001
3002        let num_cols = 2;
3003        let num_rows = 1;
3004
3005        let mut fields = Vec::new();
3006        let mut arrays = Vec::new();
3007        for i in 0..num_cols {
3008            let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
3009            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3010            fields.push(field);
3011            arrays.push(Arc::new(array) as Arc<dyn Array>);
3012        }
3013        let schema = Schema::new(fields);
3014        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3015
3016        let mut writer = FileWriter::try_new_with_options(
3017            Vec::new(),
3018            batch.schema_ref(),
3019            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3020        )
3021        .unwrap();
3022        writer.write(&batch).unwrap();
3023        writer.finish().unwrap();
3024
3025        let out: Vec<u8> = writer.into_inner().unwrap();
3026
3027        let buffer = Buffer::from_vec(out);
3028        let trailer_start = buffer.len() - 10;
3029        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3030        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3031
3032        let schema = fb_to_schema(footer.schema().unwrap());
3033
3034        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
3035        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
3036        let decoder =
3037            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3038
3039        let batches = footer.recordBatches().unwrap();
3040
3041        let block = batches.get(0);
3042        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3043        let data = buffer.slice_with_length(block.offset() as _, block_len);
3044
3045        let result = decoder.read_record_batch(block, &data);
3046
3047        let error = result.unwrap_err();
3048        assert_eq!(
3049            error.to_string(),
3050            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3051             offset from expected alignment of 16 by 8"
3052        );
3053    }
3054
3055    #[test]
3056    fn test_flush() {
3057        // We write a schema which is small enough to fit into a buffer and not get flushed,
3058        // and then force the write with .flush().
3059        let num_cols = 2;
3060        let mut fields = Vec::new();
3061        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3062        for i in 0..num_cols {
3063            let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
3064            fields.push(field);
3065        }
3066        let schema = Schema::new(fields);
3067        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3068        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3069        let mut stream_writer =
3070            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3071                .unwrap();
3072        let mut file_writer =
3073            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3074
3075        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3076        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3077        stream_writer.flush().unwrap();
3078        file_writer.flush().unwrap();
3079        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3080        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3081        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3082        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
3083        // and then a length of 0 (4 bytes) for a total of 8 bytes.
3084        // Everything before that should have been flushed in the .flush() call.
3085        let expected_stream_flushed_bytes = stream_out.len() - 8;
3086        // A file write is the same as the stream write except for the leading magic string
3087        // ARROW1 plus padding, which is 8 bytes.
3088        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3089
3090        assert!(
3091            stream_bytes_written_on_new < stream_bytes_written_on_flush,
3092            "this test makes no sense if flush is not actually required"
3093        );
3094        assert!(
3095            file_bytes_written_on_new < file_bytes_written_on_flush,
3096            "this test makes no sense if flush is not actually required"
3097        );
3098        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3099        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3100    }
3101
3102    #[test]
3103    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3104        let l1_type =
3105            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3106        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3107
3108        let l0_builder = Float32Builder::new();
3109        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3110            "item",
3111            DataType::Float32,
3112            false,
3113        )));
3114        let mut l2_builder =
3115            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3116
3117        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3118            l2_builder.values().values().append_value(point[0]);
3119            l2_builder.values().values().append_value(point[1]);
3120            l2_builder.values().values().append_value(point[2]);
3121
3122            l2_builder.values().append(true);
3123        }
3124        l2_builder.append(true);
3125
3126        let point = [10., 11., 12.];
3127        l2_builder.values().values().append_value(point[0]);
3128        l2_builder.values().values().append_value(point[1]);
3129        l2_builder.values().values().append_value(point[2]);
3130
3131        l2_builder.values().append(true);
3132        l2_builder.append(true);
3133
3134        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3135
3136        let schema = Arc::new(Schema::new_with_metadata(
3137            vec![Field::new("points", l2_type, false)],
3138            HashMap::default(),
3139        ));
3140
3141        // Test a variety of combinations that include 0 and non-zero offsets
3142        // and also portions or the rest of the array
3143        test_slices(&array, &schema, 0, 1)?;
3144        test_slices(&array, &schema, 0, 2)?;
3145        test_slices(&array, &schema, 1, 1)?;
3146
3147        Ok(())
3148    }
3149
3150    #[test]
3151    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3152        let l0_builder = Float32Builder::new();
3153        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3154        let mut l2_builder = ListBuilder::new(l1_builder);
3155
3156        for point in [
3157            [Some(1.0), Some(2.0), None],
3158            [Some(4.0), Some(5.0), Some(6.0)],
3159            [None, Some(8.0), Some(9.0)],
3160        ] {
3161            for p in point {
3162                match p {
3163                    Some(p) => l2_builder.values().values().append_value(p),
3164                    None => l2_builder.values().values().append_null(),
3165                }
3166            }
3167
3168            l2_builder.values().append(true);
3169        }
3170        l2_builder.append(true);
3171
3172        let point = [Some(10.), None, None];
3173        for p in point {
3174            match p {
3175                Some(p) => l2_builder.values().values().append_value(p),
3176                None => l2_builder.values().values().append_null(),
3177            }
3178        }
3179
3180        l2_builder.values().append(true);
3181        l2_builder.append(true);
3182
3183        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3184
3185        let schema = Arc::new(Schema::new_with_metadata(
3186            vec![Field::new(
3187                "points",
3188                DataType::List(Arc::new(Field::new(
3189                    "item",
3190                    DataType::FixedSizeList(
3191                        Arc::new(Field::new("item", DataType::Float32, true)),
3192                        3,
3193                    ),
3194                    true,
3195                ))),
3196                true,
3197            )],
3198            HashMap::default(),
3199        ));
3200
3201        // Test a variety of combinations that include 0 and non-zero offsets
3202        // and also portions or the rest of the array
3203        test_slices(&array, &schema, 0, 1)?;
3204        test_slices(&array, &schema, 0, 2)?;
3205        test_slices(&array, &schema, 1, 1)?;
3206
3207        Ok(())
3208    }
3209
3210    fn test_slices(
3211        parent_array: &ArrayRef,
3212        schema: &SchemaRef,
3213        offset: usize,
3214        length: usize,
3215    ) -> Result<(), ArrowError> {
3216        let subarray = parent_array.slice(offset, length);
3217        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3218
3219        let mut bytes = Vec::new();
3220        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3221        writer.write(&original_batch)?;
3222        writer.finish()?;
3223
3224        let mut cursor = std::io::Cursor::new(bytes);
3225        let mut reader = StreamReader::try_new(&mut cursor, None)?;
3226        let returned_batch = reader.next().unwrap()?;
3227
3228        assert_eq!(original_batch, returned_batch);
3229
3230        Ok(())
3231    }
3232
3233    #[test]
3234    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3235        let int_builder = Int64Builder::new();
3236        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3237            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3238
3239        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3240            fixed_list_builder.values().append_value(point[0]);
3241            fixed_list_builder.values().append_value(point[1]);
3242            fixed_list_builder.values().append_value(point[2]);
3243
3244            fixed_list_builder.append(true);
3245        }
3246
3247        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3248
3249        let schema = Arc::new(Schema::new_with_metadata(
3250            vec![Field::new(
3251                "points",
3252                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3253                false,
3254            )],
3255            HashMap::default(),
3256        ));
3257
3258        // Test a variety of combinations that include 0 and non-zero offsets
3259        // and also portions or the rest of the array
3260        test_slices(&array, &schema, 0, 4)?;
3261        test_slices(&array, &schema, 0, 2)?;
3262        test_slices(&array, &schema, 1, 3)?;
3263        test_slices(&array, &schema, 2, 1)?;
3264
3265        Ok(())
3266    }
3267
3268    #[test]
3269    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3270        let int_builder = Int64Builder::new();
3271        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3272
3273        for point in [
3274            [Some(1), Some(2), None],
3275            [Some(4), Some(5), Some(6)],
3276            [None, Some(8), Some(9)],
3277            [Some(10), None, None],
3278        ] {
3279            for p in point {
3280                match p {
3281                    Some(p) => fixed_list_builder.values().append_value(p),
3282                    None => fixed_list_builder.values().append_null(),
3283                }
3284            }
3285
3286            fixed_list_builder.append(true);
3287        }
3288
3289        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3290
3291        let schema = Arc::new(Schema::new_with_metadata(
3292            vec![Field::new(
3293                "points",
3294                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3295                true,
3296            )],
3297            HashMap::default(),
3298        ));
3299
3300        // Test a variety of combinations that include 0 and non-zero offsets
3301        // and also portions or the rest of the array
3302        test_slices(&array, &schema, 0, 4)?;
3303        test_slices(&array, &schema, 0, 2)?;
3304        test_slices(&array, &schema, 1, 3)?;
3305        test_slices(&array, &schema, 2, 1)?;
3306
3307        Ok(())
3308    }
3309}