arrow_ipc/
convert.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for converting between IPC types and native Arrow types
19
20use arrow_buffer::Buffer;
21use arrow_schema::*;
22use flatbuffers::{
23    FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
24    VerifierOptions, WIPOffset,
25};
26use std::collections::HashMap;
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use crate::writer::DictionaryTracker;
31use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER};
32use DataType::*;
33
34/// Low level Arrow [Schema] to IPC bytes converter
35///
36/// See also [`fb_to_schema`] for the reverse operation
37///
38/// # Example
39/// ```
40/// # use arrow_ipc::convert::{fb_to_schema, IpcSchemaEncoder};
41/// # use arrow_ipc::root_as_schema;
42/// # use arrow_ipc::writer::DictionaryTracker;
43/// # use arrow_schema::{DataType, Field, Schema};
44/// // given an arrow schema to serialize
45/// let schema = Schema::new(vec![
46///    Field::new("a", DataType::Int32, false),
47/// ]);
48///
49/// // Use a dictionary tracker to track dictionary id if needed
50///  let mut dictionary_tracker = DictionaryTracker::new(true);
51/// // create a FlatBuffersBuilder that contains the encoded bytes
52///  let fb = IpcSchemaEncoder::new()
53///    .with_dictionary_tracker(&mut dictionary_tracker)
54///    .schema_to_fb(&schema);
55///
56/// // the bytes are in `fb.finished_data()`
57/// let ipc_bytes = fb.finished_data();
58///
59///  // convert the IPC bytes back to an Arrow schema
60///  let ipc_schema = root_as_schema(ipc_bytes).unwrap();
61///  let schema2 = fb_to_schema(ipc_schema);
62/// assert_eq!(schema, schema2);
63/// ```
64#[derive(Debug)]
65pub struct IpcSchemaEncoder<'a> {
66    dictionary_tracker: Option<&'a mut DictionaryTracker>,
67}
68
69impl Default for IpcSchemaEncoder<'_> {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl<'a> IpcSchemaEncoder<'a> {
76    /// Create a new schema encoder
77    pub fn new() -> IpcSchemaEncoder<'a> {
78        IpcSchemaEncoder {
79            dictionary_tracker: None,
80        }
81    }
82
83    /// Specify a dictionary tracker to use
84    pub fn with_dictionary_tracker(
85        mut self,
86        dictionary_tracker: &'a mut DictionaryTracker,
87    ) -> Self {
88        self.dictionary_tracker = Some(dictionary_tracker);
89        self
90    }
91
92    /// Serialize a schema in IPC format, returning a completed [`FlatBufferBuilder`]
93    ///
94    /// Note: Call [`FlatBufferBuilder::finished_data`] to get the serialized bytes
95    pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
96        let mut fbb = FlatBufferBuilder::new();
97
98        let root = self.schema_to_fb_offset(&mut fbb, schema);
99
100        fbb.finish(root, None);
101
102        fbb
103    }
104
105    /// Serialize a schema to an in progress [`FlatBufferBuilder`], returning the in progress offset.
106    pub fn schema_to_fb_offset<'b>(
107        &mut self,
108        fbb: &mut FlatBufferBuilder<'b>,
109        schema: &Schema,
110    ) -> WIPOffset<crate::Schema<'b>> {
111        let fields = schema
112            .fields()
113            .iter()
114            .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
115            .collect::<Vec<_>>();
116        let fb_field_list = fbb.create_vector(&fields);
117
118        let fb_metadata_list =
119            (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
120
121        let mut builder = crate::SchemaBuilder::new(fbb);
122        builder.add_fields(fb_field_list);
123        if let Some(fb_metadata_list) = fb_metadata_list {
124            builder.add_custom_metadata(fb_metadata_list);
125        }
126        builder.finish()
127    }
128}
129
130/// Serialize a schema in IPC format
131#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")]
132pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> {
133    IpcSchemaEncoder::new().schema_to_fb(schema)
134}
135
136/// Push a key-value metadata into a FlatBufferBuilder and return [WIPOffset]
137pub fn metadata_to_fb<'a>(
138    fbb: &mut FlatBufferBuilder<'a>,
139    metadata: &HashMap<String, String>,
140) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
141    let custom_metadata = metadata
142        .iter()
143        .map(|(k, v)| {
144            let fb_key_name = fbb.create_string(k);
145            let fb_val_name = fbb.create_string(v);
146
147            let mut kv_builder = crate::KeyValueBuilder::new(fbb);
148            kv_builder.add_key(fb_key_name);
149            kv_builder.add_value(fb_val_name);
150            kv_builder.finish()
151        })
152        .collect::<Vec<_>>();
153    fbb.create_vector(&custom_metadata)
154}
155
156/// Adds a [Schema] to a flatbuffer and returns the offset
157pub fn schema_to_fb_offset<'a>(
158    fbb: &mut FlatBufferBuilder<'a>,
159    schema: &Schema,
160) -> WIPOffset<crate::Schema<'a>> {
161    IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
162}
163
164/// Convert an IPC Field to Arrow Field
165impl From<crate::Field<'_>> for Field {
166    fn from(field: crate::Field) -> Field {
167        let arrow_field = if let Some(dictionary) = field.dictionary() {
168            #[allow(deprecated)]
169            Field::new_dict(
170                field.name().unwrap(),
171                get_data_type(field, true),
172                field.nullable(),
173                dictionary.id(),
174                dictionary.isOrdered(),
175            )
176        } else {
177            Field::new(
178                field.name().unwrap(),
179                get_data_type(field, true),
180                field.nullable(),
181            )
182        };
183
184        let mut metadata_map = HashMap::default();
185        if let Some(list) = field.custom_metadata() {
186            for kv in list {
187                if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
188                    metadata_map.insert(k.to_string(), v.to_string());
189                }
190            }
191        }
192
193        arrow_field.with_metadata(metadata_map)
194    }
195}
196
197/// Deserialize an ipc [crate::Schema`] from flat buffers to an arrow [Schema].
198pub fn fb_to_schema(fb: crate::Schema) -> Schema {
199    let mut fields: Vec<Field> = vec![];
200    let c_fields = fb.fields().unwrap();
201    let len = c_fields.len();
202    for i in 0..len {
203        let c_field: crate::Field = c_fields.get(i);
204        match c_field.type_type() {
205            crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
206                unimplemented!("Big Endian is not supported for Decimal!")
207            }
208            _ => (),
209        };
210        fields.push(c_field.into());
211    }
212
213    let mut metadata: HashMap<String, String> = HashMap::default();
214    if let Some(md_fields) = fb.custom_metadata() {
215        let len = md_fields.len();
216        for i in 0..len {
217            let kv = md_fields.get(i);
218            let k_str = kv.key();
219            let v_str = kv.value();
220            if let Some(k) = k_str {
221                if let Some(v) = v_str {
222                    metadata.insert(k.to_string(), v.to_string());
223                }
224            }
225        }
226    }
227    Schema::new_with_metadata(fields, metadata)
228}
229
230/// Try deserialize flat buffer format bytes into a schema
231pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
232    if let Ok(ipc) = crate::root_as_message(bytes) {
233        if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
234            Ok(schema)
235        } else {
236            Err(ArrowError::ParseError(
237                "Unable to get head as schema".to_string(),
238            ))
239        }
240    } else {
241        Err(ArrowError::ParseError(
242            "Unable to get root as message".to_string(),
243        ))
244    }
245}
246
247/// Try deserialize the IPC format bytes into a schema
248pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
249    // There are two protocol types: https://issues.apache.org/jira/browse/ARROW-6313
250    // The original protocol is:
251    //   4 bytes - the byte length of the payload
252    //   a flatbuffer Message whose header is the Schema
253    // The latest version of protocol is:
254    // The schema of the dataset in its IPC form:
255    //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
256    //   4 bytes - the byte length of the payload
257    //   a flatbuffer Message whose header is the Schema
258    if buffer.len() >= 4 {
259        // check continuation marker
260        let continuation_marker = &buffer[0..4];
261        let begin_offset: usize = if continuation_marker.eq(&CONTINUATION_MARKER) {
262            // 4 bytes: CONTINUATION_MARKER
263            // 4 bytes: length
264            // buffer
265            4
266        } else {
267            // backward compatibility for buffer without the continuation marker
268            // 4 bytes: length
269            // buffer
270            0
271        };
272        let msg = size_prefixed_root_as_message(&buffer[begin_offset..]).map_err(|err| {
273            ArrowError::ParseError(format!("Unable to convert flight info to a message: {err}"))
274        })?;
275        let ipc_schema = msg.header_as_schema().ok_or_else(|| {
276            ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
277        })?;
278        Ok(fb_to_schema(ipc_schema))
279    } else {
280        Err(ArrowError::ParseError(
281            "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
282        ))
283    }
284}
285
286/// Get the Arrow data type from the flatbuffer Field table
287pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
288    if let Some(dictionary) = field.dictionary() {
289        if may_be_dictionary {
290            let int = dictionary.indexType().unwrap();
291            let index_type = match (int.bitWidth(), int.is_signed()) {
292                (8, true) => DataType::Int8,
293                (8, false) => DataType::UInt8,
294                (16, true) => DataType::Int16,
295                (16, false) => DataType::UInt16,
296                (32, true) => DataType::Int32,
297                (32, false) => DataType::UInt32,
298                (64, true) => DataType::Int64,
299                (64, false) => DataType::UInt64,
300                _ => panic!("Unexpected bitwidth and signed"),
301            };
302            return DataType::Dictionary(
303                Box::new(index_type),
304                Box::new(get_data_type(field, false)),
305            );
306        }
307    }
308
309    match field.type_type() {
310        crate::Type::Null => DataType::Null,
311        crate::Type::Bool => DataType::Boolean,
312        crate::Type::Int => {
313            let int = field.type_as_int().unwrap();
314            match (int.bitWidth(), int.is_signed()) {
315                (8, true) => DataType::Int8,
316                (8, false) => DataType::UInt8,
317                (16, true) => DataType::Int16,
318                (16, false) => DataType::UInt16,
319                (32, true) => DataType::Int32,
320                (32, false) => DataType::UInt32,
321                (64, true) => DataType::Int64,
322                (64, false) => DataType::UInt64,
323                z => panic!(
324                    "Int type with bit width of {} and signed of {} not supported",
325                    z.0, z.1
326                ),
327            }
328        }
329        crate::Type::Binary => DataType::Binary,
330        crate::Type::BinaryView => DataType::BinaryView,
331        crate::Type::LargeBinary => DataType::LargeBinary,
332        crate::Type::Utf8 => DataType::Utf8,
333        crate::Type::Utf8View => DataType::Utf8View,
334        crate::Type::LargeUtf8 => DataType::LargeUtf8,
335        crate::Type::FixedSizeBinary => {
336            let fsb = field.type_as_fixed_size_binary().unwrap();
337            DataType::FixedSizeBinary(fsb.byteWidth())
338        }
339        crate::Type::FloatingPoint => {
340            let float = field.type_as_floating_point().unwrap();
341            match float.precision() {
342                crate::Precision::HALF => DataType::Float16,
343                crate::Precision::SINGLE => DataType::Float32,
344                crate::Precision::DOUBLE => DataType::Float64,
345                z => panic!("FloatingPoint type with precision of {z:?} not supported"),
346            }
347        }
348        crate::Type::Date => {
349            let date = field.type_as_date().unwrap();
350            match date.unit() {
351                crate::DateUnit::DAY => DataType::Date32,
352                crate::DateUnit::MILLISECOND => DataType::Date64,
353                z => panic!("Date type with unit of {z:?} not supported"),
354            }
355        }
356        crate::Type::Time => {
357            let time = field.type_as_time().unwrap();
358            match (time.bitWidth(), time.unit()) {
359                (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
360                (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
361                (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
362                (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
363                z => panic!(
364                    "Time type with bit width of {} and unit of {:?} not supported",
365                    z.0, z.1
366                ),
367            }
368        }
369        crate::Type::Timestamp => {
370            let timestamp = field.type_as_timestamp().unwrap();
371            let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
372            match timestamp.unit() {
373                crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
374                crate::TimeUnit::MILLISECOND => {
375                    DataType::Timestamp(TimeUnit::Millisecond, timezone)
376                }
377                crate::TimeUnit::MICROSECOND => {
378                    DataType::Timestamp(TimeUnit::Microsecond, timezone)
379                }
380                crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
381                z => panic!("Timestamp type with unit of {z:?} not supported"),
382            }
383        }
384        crate::Type::Interval => {
385            let interval = field.type_as_interval().unwrap();
386            match interval.unit() {
387                crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
388                crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
389                crate::IntervalUnit::MONTH_DAY_NANO => {
390                    DataType::Interval(IntervalUnit::MonthDayNano)
391                }
392                z => panic!("Interval type with unit of {z:?} unsupported"),
393            }
394        }
395        crate::Type::Duration => {
396            let duration = field.type_as_duration().unwrap();
397            match duration.unit() {
398                crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
399                crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
400                crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
401                crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
402                z => panic!("Duration type with unit of {z:?} unsupported"),
403            }
404        }
405        crate::Type::List => {
406            let children = field.children().unwrap();
407            if children.len() != 1 {
408                panic!("expect a list to have one child")
409            }
410            DataType::List(Arc::new(children.get(0).into()))
411        }
412        crate::Type::LargeList => {
413            let children = field.children().unwrap();
414            if children.len() != 1 {
415                panic!("expect a large list to have one child")
416            }
417            DataType::LargeList(Arc::new(children.get(0).into()))
418        }
419        crate::Type::FixedSizeList => {
420            let children = field.children().unwrap();
421            if children.len() != 1 {
422                panic!("expect a list to have one child")
423            }
424            let fsl = field.type_as_fixed_size_list().unwrap();
425            DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
426        }
427        crate::Type::Struct_ => {
428            let fields = match field.children() {
429                Some(children) => children.iter().map(Field::from).collect(),
430                None => Fields::empty(),
431            };
432            DataType::Struct(fields)
433        }
434        crate::Type::RunEndEncoded => {
435            let children = field.children().unwrap();
436            if children.len() != 2 {
437                panic!(
438                    "RunEndEncoded type should have exactly two children. Found {}",
439                    children.len()
440                )
441            }
442            let run_ends_field = children.get(0).into();
443            let values_field = children.get(1).into();
444            DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
445        }
446        crate::Type::Map => {
447            let map = field.type_as_map().unwrap();
448            let children = field.children().unwrap();
449            if children.len() != 1 {
450                panic!("expect a map to have one child")
451            }
452            DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
453        }
454        crate::Type::Decimal => {
455            let fsb = field.type_as_decimal().unwrap();
456            let bit_width = fsb.bitWidth();
457            let precision: u8 = fsb.precision().try_into().unwrap();
458            let scale: i8 = fsb.scale().try_into().unwrap();
459            match bit_width {
460                128 => DataType::Decimal128(precision, scale),
461                256 => DataType::Decimal256(precision, scale),
462                _ => panic!("Unexpected decimal bit width {bit_width}"),
463            }
464        }
465        crate::Type::Union => {
466            let union = field.type_as_union().unwrap();
467
468            let union_mode = match union.mode() {
469                crate::UnionMode::Dense => UnionMode::Dense,
470                crate::UnionMode::Sparse => UnionMode::Sparse,
471                mode => panic!("Unexpected union mode: {mode:?}"),
472            };
473
474            let mut fields = vec![];
475            if let Some(children) = field.children() {
476                for i in 0..children.len() {
477                    fields.push(Field::from(children.get(i)));
478                }
479            };
480
481            let fields = match union.typeIds() {
482                None => UnionFields::new(0_i8..fields.len() as i8, fields),
483                Some(ids) => UnionFields::new(ids.iter().map(|i| i as i8), fields),
484            };
485
486            DataType::Union(fields, union_mode)
487        }
488        t => unimplemented!("Type {:?} not supported", t),
489    }
490}
491
492pub(crate) struct FBFieldType<'b> {
493    pub(crate) type_type: crate::Type,
494    pub(crate) type_: WIPOffset<UnionWIPOffset>,
495    pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
496}
497
498/// Create an IPC Field from an Arrow Field
499pub(crate) fn build_field<'a>(
500    fbb: &mut FlatBufferBuilder<'a>,
501    dictionary_tracker: &mut Option<&mut DictionaryTracker>,
502    field: &Field,
503) -> WIPOffset<crate::Field<'a>> {
504    // Optional custom metadata.
505    let mut fb_metadata = None;
506    if !field.metadata().is_empty() {
507        fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
508    };
509
510    let fb_field_name = fbb.create_string(field.name().as_str());
511    let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
512
513    let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
514        match dictionary_tracker {
515            Some(tracker) => Some(get_fb_dictionary(
516                index_type,
517                #[allow(deprecated)]
518                tracker.set_dict_id(field),
519                field
520                    .dict_is_ordered()
521                    .expect("All Dictionary types have `dict_is_ordered`"),
522                fbb,
523            )),
524            None => Some(get_fb_dictionary(
525                index_type,
526                #[allow(deprecated)]
527                field
528                    .dict_id()
529                    .expect("Dictionary type must have a dictionary id"),
530                field
531                    .dict_is_ordered()
532                    .expect("All Dictionary types have `dict_is_ordered`"),
533                fbb,
534            )),
535        }
536    } else {
537        None
538    };
539
540    let mut field_builder = crate::FieldBuilder::new(fbb);
541    field_builder.add_name(fb_field_name);
542    if let Some(dictionary) = fb_dictionary {
543        field_builder.add_dictionary(dictionary)
544    }
545    field_builder.add_type_type(field_type.type_type);
546    field_builder.add_nullable(field.is_nullable());
547    match field_type.children {
548        None => {}
549        Some(children) => field_builder.add_children(children),
550    };
551    field_builder.add_type_(field_type.type_);
552
553    if let Some(fb_metadata) = fb_metadata {
554        field_builder.add_custom_metadata(fb_metadata);
555    }
556
557    field_builder.finish()
558}
559
560/// Get the IPC type of a data type
561pub(crate) fn get_fb_field_type<'a>(
562    data_type: &DataType,
563    dictionary_tracker: &mut Option<&mut DictionaryTracker>,
564    fbb: &mut FlatBufferBuilder<'a>,
565) -> FBFieldType<'a> {
566    // some IPC implementations expect an empty list for child data, instead of a null value.
567    // An empty field list is thus returned for primitive types
568    let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
569    match data_type {
570        Null => FBFieldType {
571            type_type: crate::Type::Null,
572            type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
573            children: Some(fbb.create_vector(&empty_fields[..])),
574        },
575        Boolean => FBFieldType {
576            type_type: crate::Type::Bool,
577            type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
578            children: Some(fbb.create_vector(&empty_fields[..])),
579        },
580        UInt8 | UInt16 | UInt32 | UInt64 => {
581            let children = fbb.create_vector(&empty_fields[..]);
582            let mut builder = crate::IntBuilder::new(fbb);
583            builder.add_is_signed(false);
584            match data_type {
585                UInt8 => builder.add_bitWidth(8),
586                UInt16 => builder.add_bitWidth(16),
587                UInt32 => builder.add_bitWidth(32),
588                UInt64 => builder.add_bitWidth(64),
589                _ => {}
590            };
591            FBFieldType {
592                type_type: crate::Type::Int,
593                type_: builder.finish().as_union_value(),
594                children: Some(children),
595            }
596        }
597        Int8 | Int16 | Int32 | Int64 => {
598            let children = fbb.create_vector(&empty_fields[..]);
599            let mut builder = crate::IntBuilder::new(fbb);
600            builder.add_is_signed(true);
601            match data_type {
602                Int8 => builder.add_bitWidth(8),
603                Int16 => builder.add_bitWidth(16),
604                Int32 => builder.add_bitWidth(32),
605                Int64 => builder.add_bitWidth(64),
606                _ => {}
607            };
608            FBFieldType {
609                type_type: crate::Type::Int,
610                type_: builder.finish().as_union_value(),
611                children: Some(children),
612            }
613        }
614        Float16 | Float32 | Float64 => {
615            let children = fbb.create_vector(&empty_fields[..]);
616            let mut builder = crate::FloatingPointBuilder::new(fbb);
617            match data_type {
618                Float16 => builder.add_precision(crate::Precision::HALF),
619                Float32 => builder.add_precision(crate::Precision::SINGLE),
620                Float64 => builder.add_precision(crate::Precision::DOUBLE),
621                _ => {}
622            };
623            FBFieldType {
624                type_type: crate::Type::FloatingPoint,
625                type_: builder.finish().as_union_value(),
626                children: Some(children),
627            }
628        }
629        Binary => FBFieldType {
630            type_type: crate::Type::Binary,
631            type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
632            children: Some(fbb.create_vector(&empty_fields[..])),
633        },
634        LargeBinary => FBFieldType {
635            type_type: crate::Type::LargeBinary,
636            type_: crate::LargeBinaryBuilder::new(fbb)
637                .finish()
638                .as_union_value(),
639            children: Some(fbb.create_vector(&empty_fields[..])),
640        },
641        BinaryView => FBFieldType {
642            type_type: crate::Type::BinaryView,
643            type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
644            children: Some(fbb.create_vector(&empty_fields[..])),
645        },
646        Utf8View => FBFieldType {
647            type_type: crate::Type::Utf8View,
648            type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
649            children: Some(fbb.create_vector(&empty_fields[..])),
650        },
651        Utf8 => FBFieldType {
652            type_type: crate::Type::Utf8,
653            type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
654            children: Some(fbb.create_vector(&empty_fields[..])),
655        },
656        LargeUtf8 => FBFieldType {
657            type_type: crate::Type::LargeUtf8,
658            type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
659            children: Some(fbb.create_vector(&empty_fields[..])),
660        },
661        FixedSizeBinary(len) => {
662            let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
663            builder.add_byteWidth(*len);
664            FBFieldType {
665                type_type: crate::Type::FixedSizeBinary,
666                type_: builder.finish().as_union_value(),
667                children: Some(fbb.create_vector(&empty_fields[..])),
668            }
669        }
670        Date32 => {
671            let mut builder = crate::DateBuilder::new(fbb);
672            builder.add_unit(crate::DateUnit::DAY);
673            FBFieldType {
674                type_type: crate::Type::Date,
675                type_: builder.finish().as_union_value(),
676                children: Some(fbb.create_vector(&empty_fields[..])),
677            }
678        }
679        Date64 => {
680            let mut builder = crate::DateBuilder::new(fbb);
681            builder.add_unit(crate::DateUnit::MILLISECOND);
682            FBFieldType {
683                type_type: crate::Type::Date,
684                type_: builder.finish().as_union_value(),
685                children: Some(fbb.create_vector(&empty_fields[..])),
686            }
687        }
688        Time32(unit) | Time64(unit) => {
689            let mut builder = crate::TimeBuilder::new(fbb);
690            match unit {
691                TimeUnit::Second => {
692                    builder.add_bitWidth(32);
693                    builder.add_unit(crate::TimeUnit::SECOND);
694                }
695                TimeUnit::Millisecond => {
696                    builder.add_bitWidth(32);
697                    builder.add_unit(crate::TimeUnit::MILLISECOND);
698                }
699                TimeUnit::Microsecond => {
700                    builder.add_bitWidth(64);
701                    builder.add_unit(crate::TimeUnit::MICROSECOND);
702                }
703                TimeUnit::Nanosecond => {
704                    builder.add_bitWidth(64);
705                    builder.add_unit(crate::TimeUnit::NANOSECOND);
706                }
707            }
708            FBFieldType {
709                type_type: crate::Type::Time,
710                type_: builder.finish().as_union_value(),
711                children: Some(fbb.create_vector(&empty_fields[..])),
712            }
713        }
714        Timestamp(unit, tz) => {
715            let tz = tz.as_deref().unwrap_or_default();
716            let tz_str = fbb.create_string(tz);
717            let mut builder = crate::TimestampBuilder::new(fbb);
718            let time_unit = match unit {
719                TimeUnit::Second => crate::TimeUnit::SECOND,
720                TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
721                TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
722                TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
723            };
724            builder.add_unit(time_unit);
725            if !tz.is_empty() {
726                builder.add_timezone(tz_str);
727            }
728            FBFieldType {
729                type_type: crate::Type::Timestamp,
730                type_: builder.finish().as_union_value(),
731                children: Some(fbb.create_vector(&empty_fields[..])),
732            }
733        }
734        Interval(unit) => {
735            let mut builder = crate::IntervalBuilder::new(fbb);
736            let interval_unit = match unit {
737                IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
738                IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
739                IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
740            };
741            builder.add_unit(interval_unit);
742            FBFieldType {
743                type_type: crate::Type::Interval,
744                type_: builder.finish().as_union_value(),
745                children: Some(fbb.create_vector(&empty_fields[..])),
746            }
747        }
748        Duration(unit) => {
749            let mut builder = crate::DurationBuilder::new(fbb);
750            let time_unit = match unit {
751                TimeUnit::Second => crate::TimeUnit::SECOND,
752                TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
753                TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
754                TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
755            };
756            builder.add_unit(time_unit);
757            FBFieldType {
758                type_type: crate::Type::Duration,
759                type_: builder.finish().as_union_value(),
760                children: Some(fbb.create_vector(&empty_fields[..])),
761            }
762        }
763        List(ref list_type) => {
764            let child = build_field(fbb, dictionary_tracker, list_type);
765            FBFieldType {
766                type_type: crate::Type::List,
767                type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
768                children: Some(fbb.create_vector(&[child])),
769            }
770        }
771        ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
772        LargeList(ref list_type) => {
773            let child = build_field(fbb, dictionary_tracker, list_type);
774            FBFieldType {
775                type_type: crate::Type::LargeList,
776                type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
777                children: Some(fbb.create_vector(&[child])),
778            }
779        }
780        FixedSizeList(ref list_type, len) => {
781            let child = build_field(fbb, dictionary_tracker, list_type);
782            let mut builder = crate::FixedSizeListBuilder::new(fbb);
783            builder.add_listSize(*len);
784            FBFieldType {
785                type_type: crate::Type::FixedSizeList,
786                type_: builder.finish().as_union_value(),
787                children: Some(fbb.create_vector(&[child])),
788            }
789        }
790        Struct(fields) => {
791            // struct's fields are children
792            let mut children = vec![];
793            for field in fields {
794                children.push(build_field(fbb, dictionary_tracker, field));
795            }
796            FBFieldType {
797                type_type: crate::Type::Struct_,
798                type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
799                children: Some(fbb.create_vector(&children[..])),
800            }
801        }
802        RunEndEncoded(run_ends, values) => {
803            let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
804            let values_field = build_field(fbb, dictionary_tracker, values);
805            let children = [run_ends_field, values_field];
806            FBFieldType {
807                type_type: crate::Type::RunEndEncoded,
808                type_: crate::RunEndEncodedBuilder::new(fbb)
809                    .finish()
810                    .as_union_value(),
811                children: Some(fbb.create_vector(&children[..])),
812            }
813        }
814        Map(map_field, keys_sorted) => {
815            let child = build_field(fbb, dictionary_tracker, map_field);
816            let mut field_type = crate::MapBuilder::new(fbb);
817            field_type.add_keysSorted(*keys_sorted);
818            FBFieldType {
819                type_type: crate::Type::Map,
820                type_: field_type.finish().as_union_value(),
821                children: Some(fbb.create_vector(&[child])),
822            }
823        }
824        Dictionary(_, value_type) => {
825            // In this library, the dictionary "type" is a logical construct. Here we
826            // pass through to the value type, as we've already captured the index
827            // type in the DictionaryEncoding metadata in the parent field
828            get_fb_field_type(value_type, dictionary_tracker, fbb)
829        }
830        Decimal128(precision, scale) => {
831            let mut builder = crate::DecimalBuilder::new(fbb);
832            builder.add_precision(*precision as i32);
833            builder.add_scale(*scale as i32);
834            builder.add_bitWidth(128);
835            FBFieldType {
836                type_type: crate::Type::Decimal,
837                type_: builder.finish().as_union_value(),
838                children: Some(fbb.create_vector(&empty_fields[..])),
839            }
840        }
841        Decimal256(precision, scale) => {
842            let mut builder = crate::DecimalBuilder::new(fbb);
843            builder.add_precision(*precision as i32);
844            builder.add_scale(*scale as i32);
845            builder.add_bitWidth(256);
846            FBFieldType {
847                type_type: crate::Type::Decimal,
848                type_: builder.finish().as_union_value(),
849                children: Some(fbb.create_vector(&empty_fields[..])),
850            }
851        }
852        Union(fields, mode) => {
853            let mut children = vec![];
854            for (_, field) in fields.iter() {
855                children.push(build_field(fbb, dictionary_tracker, field));
856            }
857
858            let union_mode = match mode {
859                UnionMode::Sparse => crate::UnionMode::Sparse,
860                UnionMode::Dense => crate::UnionMode::Dense,
861            };
862
863            let fbb_type_ids =
864                fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
865            let mut builder = crate::UnionBuilder::new(fbb);
866            builder.add_mode(union_mode);
867            builder.add_typeIds(fbb_type_ids);
868
869            FBFieldType {
870                type_type: crate::Type::Union,
871                type_: builder.finish().as_union_value(),
872                children: Some(fbb.create_vector(&children[..])),
873            }
874        }
875    }
876}
877
878/// Create an IPC dictionary encoding
879pub(crate) fn get_fb_dictionary<'a>(
880    index_type: &DataType,
881    dict_id: i64,
882    dict_is_ordered: bool,
883    fbb: &mut FlatBufferBuilder<'a>,
884) -> WIPOffset<crate::DictionaryEncoding<'a>> {
885    // We assume that the dictionary index type (as an integer) has already been
886    // validated elsewhere, and can safely assume we are dealing with integers
887    let mut index_builder = crate::IntBuilder::new(fbb);
888
889    match *index_type {
890        Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
891        UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
892        _ => {}
893    }
894
895    match *index_type {
896        Int8 | UInt8 => index_builder.add_bitWidth(8),
897        Int16 | UInt16 => index_builder.add_bitWidth(16),
898        Int32 | UInt32 => index_builder.add_bitWidth(32),
899        Int64 | UInt64 => index_builder.add_bitWidth(64),
900        _ => {}
901    }
902
903    let index_builder = index_builder.finish();
904
905    let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
906    builder.add_id(dict_id);
907    builder.add_indexType(index_builder);
908    builder.add_isOrdered(dict_is_ordered);
909
910    builder.finish()
911}
912
913/// An owned container for a validated [`Message`]
914///
915/// Safely decoding a flatbuffer requires validating the various embedded offsets,
916/// see [`Verifier`]. This is a potentially expensive operation, and it is therefore desirable
917/// to only do this once. [`crate::root_as_message`] performs this validation on construction,
918/// however, it returns a [`Message`] borrowing the provided byte slice. This prevents
919/// storing this [`Message`] in the same data structure that owns the buffer, as this
920/// would require self-referential borrows.
921///
922/// [`MessageBuffer`] solves this problem by providing a safe API for a [`Message`]
923/// without a lifetime bound.
924#[derive(Clone)]
925pub struct MessageBuffer(Buffer);
926
927impl Debug for MessageBuffer {
928    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
929        self.as_ref().fmt(f)
930    }
931}
932
933impl MessageBuffer {
934    /// Try to create a [`MessageBuffer`] from the provided [`Buffer`]
935    pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
936        let opts = VerifierOptions::default();
937        let mut v = Verifier::new(&opts, &buf);
938        <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
939            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
940        })?;
941        Ok(Self(buf))
942    }
943
944    /// Return the [`Message`]
945    #[inline]
946    pub fn as_ref(&self) -> Message<'_> {
947        // SAFETY: Run verifier on construction
948        unsafe { crate::root_as_message_unchecked(&self.0) }
949    }
950}
951
952#[cfg(test)]
953mod tests {
954    use super::*;
955
956    #[test]
957    fn convert_schema_round_trip() {
958        let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
959            .iter()
960            .cloned()
961            .collect();
962        let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
963            .iter()
964            .cloned()
965            .collect();
966        let schema = Schema::new_with_metadata(
967            vec![
968                Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
969                Field::new("uint16", DataType::UInt16, true),
970                Field::new("uint32", DataType::UInt32, false),
971                Field::new("uint64", DataType::UInt64, true),
972                Field::new("int8", DataType::Int8, true),
973                Field::new("int16", DataType::Int16, false),
974                Field::new("int32", DataType::Int32, true),
975                Field::new("int64", DataType::Int64, false),
976                Field::new("float16", DataType::Float16, true),
977                Field::new("float32", DataType::Float32, false),
978                Field::new("float64", DataType::Float64, true),
979                Field::new("null", DataType::Null, false),
980                Field::new("bool", DataType::Boolean, false),
981                Field::new("date32", DataType::Date32, false),
982                Field::new("date64", DataType::Date64, true),
983                Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
984                Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
985                Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
986                Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
987                Field::new(
988                    "timestamp[s]",
989                    DataType::Timestamp(TimeUnit::Second, None),
990                    false,
991                ),
992                Field::new(
993                    "timestamp[ms]",
994                    DataType::Timestamp(TimeUnit::Millisecond, None),
995                    true,
996                ),
997                Field::new(
998                    "timestamp[us]",
999                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1000                    false,
1001                ),
1002                Field::new(
1003                    "timestamp[ns]",
1004                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1005                    true,
1006                ),
1007                Field::new(
1008                    "interval[ym]",
1009                    DataType::Interval(IntervalUnit::YearMonth),
1010                    true,
1011                ),
1012                Field::new(
1013                    "interval[dt]",
1014                    DataType::Interval(IntervalUnit::DayTime),
1015                    true,
1016                ),
1017                Field::new(
1018                    "interval[mdn]",
1019                    DataType::Interval(IntervalUnit::MonthDayNano),
1020                    true,
1021                ),
1022                Field::new("utf8", DataType::Utf8, false),
1023                Field::new("utf8_view", DataType::Utf8View, false),
1024                Field::new("binary", DataType::Binary, false),
1025                Field::new("binary_view", DataType::BinaryView, false),
1026                Field::new_list(
1027                    "list[u8]",
1028                    Field::new_list_field(DataType::UInt8, false),
1029                    true,
1030                ),
1031                Field::new_fixed_size_list(
1032                    "fixed_size_list[u8]",
1033                    Field::new_list_field(DataType::UInt8, false),
1034                    2,
1035                    true,
1036                ),
1037                Field::new_list(
1038                    "list[struct<float32, int32, bool>]",
1039                    Field::new_struct(
1040                        "struct",
1041                        vec![
1042                            Field::new("float32", UInt8, false),
1043                            Field::new("int32", Int32, true),
1044                            Field::new("bool", Boolean, true),
1045                        ],
1046                        true,
1047                    ),
1048                    false,
1049                ),
1050                Field::new_struct(
1051                    "struct<dictionary<int32, utf8>>",
1052                    vec![Field::new(
1053                        "dictionary<int32, utf8>",
1054                        Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1055                        false,
1056                    )],
1057                    false,
1058                ),
1059                Field::new_struct(
1060                    "struct<int64, list[struct<date32, list[struct<>]>]>",
1061                    vec![
1062                        Field::new("int64", DataType::Int64, true),
1063                        Field::new_list(
1064                            "list[struct<date32, list[struct<>]>]",
1065                            Field::new_struct(
1066                                "struct",
1067                                vec![
1068                                    Field::new("date32", DataType::Date32, true),
1069                                    Field::new_list(
1070                                        "list[struct<>]",
1071                                        Field::new(
1072                                            "struct",
1073                                            DataType::Struct(Fields::empty()),
1074                                            false,
1075                                        ),
1076                                        false,
1077                                    ),
1078                                ],
1079                                false,
1080                            ),
1081                            false,
1082                        ),
1083                    ],
1084                    false,
1085                ),
1086                Field::new_union(
1087                    "union<int64, list[union<date32, list[union<>]>]>",
1088                    vec![0, 1],
1089                    vec![
1090                        Field::new("int64", DataType::Int64, true),
1091                        Field::new_list(
1092                            "list[union<date32, list[union<>]>]",
1093                            Field::new_union(
1094                                "union<date32, list[union<>]>",
1095                                vec![0, 1],
1096                                vec![
1097                                    Field::new("date32", DataType::Date32, true),
1098                                    Field::new_list(
1099                                        "list[union<>]",
1100                                        Field::new(
1101                                            "union",
1102                                            DataType::Union(
1103                                                UnionFields::empty(),
1104                                                UnionMode::Sparse,
1105                                            ),
1106                                            false,
1107                                        ),
1108                                        false,
1109                                    ),
1110                                ],
1111                                UnionMode::Dense,
1112                            ),
1113                            false,
1114                        ),
1115                    ],
1116                    UnionMode::Sparse,
1117                ),
1118                Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1119                Field::new(
1120                    "union<>",
1121                    DataType::Union(UnionFields::empty(), UnionMode::Dense),
1122                    true,
1123                ),
1124                Field::new(
1125                    "union<>",
1126                    DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1127                    true,
1128                ),
1129                Field::new(
1130                    "union<int32, utf8>",
1131                    DataType::Union(
1132                        UnionFields::new(
1133                            vec![2, 3], // non-default type ids
1134                            vec![
1135                                Field::new("int32", DataType::Int32, true),
1136                                Field::new("utf8", DataType::Utf8, true),
1137                            ],
1138                        ),
1139                        UnionMode::Dense,
1140                    ),
1141                    true,
1142                ),
1143                #[allow(deprecated)]
1144                Field::new_dict(
1145                    "dictionary<int32, utf8>",
1146                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1147                    true,
1148                    123,
1149                    true,
1150                ),
1151                #[allow(deprecated)]
1152                Field::new_dict(
1153                    "dictionary<uint8, uint32>",
1154                    DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1155                    true,
1156                    123,
1157                    true,
1158                ),
1159                Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1160            ],
1161            md,
1162        );
1163
1164        let mut dictionary_tracker = DictionaryTracker::new(true);
1165        let fb = IpcSchemaEncoder::new()
1166            .with_dictionary_tracker(&mut dictionary_tracker)
1167            .schema_to_fb(&schema);
1168
1169        // read back fields
1170        let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1171        let schema2 = fb_to_schema(ipc);
1172        assert_eq!(schema, schema2);
1173    }
1174
1175    #[test]
1176    fn schema_from_bytes() {
1177        // Bytes of a schema generated via following python code, using pyarrow 10.0.1:
1178        //
1179        // import pyarrow as pa
1180        // schema = pa.schema([pa.field('field1', pa.uint32(), nullable=False)])
1181        // sink = pa.BufferOutputStream()
1182        // with pa.ipc.new_stream(sink, schema) as writer:
1183        //     pass
1184        // # stripping continuation & length prefix & suffix bytes to get only schema bytes
1185        // [x for x in sink.getvalue().to_pybytes()][8:-8]
1186        let bytes: Vec<u8> = vec![
1187            16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1188            0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1189            0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1190            0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1191            0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1192        ];
1193        let ipc = crate::root_as_message(&bytes).unwrap();
1194        let schema = ipc.header_as_schema().unwrap();
1195
1196        // generate same message with Rust
1197        let data_gen = crate::writer::IpcDataGenerator::default();
1198        let mut dictionary_tracker = DictionaryTracker::new(true);
1199        let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1200        let bytes = data_gen
1201            .schema_to_bytes_with_dictionary_tracker(
1202                &arrow_schema,
1203                &mut dictionary_tracker,
1204                &crate::writer::IpcWriteOptions::default(),
1205            )
1206            .ipc_message;
1207
1208        let ipc2 = crate::root_as_message(&bytes).unwrap();
1209        let schema2 = ipc2.header_as_schema().unwrap();
1210
1211        // can't compare schema directly as it compares the underlying bytes, which can differ
1212        assert!(schema.custom_metadata().is_none());
1213        assert!(schema2.custom_metadata().is_none());
1214        assert_eq!(schema.endianness(), schema2.endianness());
1215        assert!(schema.features().is_none());
1216        assert!(schema2.features().is_none());
1217        assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1218
1219        assert_eq!(ipc.version(), ipc2.version());
1220        assert_eq!(ipc.header_type(), ipc2.header_type());
1221        assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1222        assert!(ipc.custom_metadata().is_none());
1223        assert!(ipc2.custom_metadata().is_none());
1224    }
1225}