1use arrow_buffer::Buffer;
21use arrow_schema::*;
22use flatbuffers::{
23 FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
24 VerifierOptions, WIPOffset,
25};
26use std::collections::HashMap;
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use crate::writer::DictionaryTracker;
31use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER};
32use DataType::*;
33
34#[derive(Debug)]
65pub struct IpcSchemaEncoder<'a> {
66 dictionary_tracker: Option<&'a mut DictionaryTracker>,
67}
68
69impl Default for IpcSchemaEncoder<'_> {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl<'a> IpcSchemaEncoder<'a> {
76 pub fn new() -> IpcSchemaEncoder<'a> {
78 IpcSchemaEncoder {
79 dictionary_tracker: None,
80 }
81 }
82
83 pub fn with_dictionary_tracker(
85 mut self,
86 dictionary_tracker: &'a mut DictionaryTracker,
87 ) -> Self {
88 self.dictionary_tracker = Some(dictionary_tracker);
89 self
90 }
91
92 pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
96 let mut fbb = FlatBufferBuilder::new();
97
98 let root = self.schema_to_fb_offset(&mut fbb, schema);
99
100 fbb.finish(root, None);
101
102 fbb
103 }
104
105 pub fn schema_to_fb_offset<'b>(
107 &mut self,
108 fbb: &mut FlatBufferBuilder<'b>,
109 schema: &Schema,
110 ) -> WIPOffset<crate::Schema<'b>> {
111 let fields = schema
112 .fields()
113 .iter()
114 .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
115 .collect::<Vec<_>>();
116 let fb_field_list = fbb.create_vector(&fields);
117
118 let fb_metadata_list =
119 (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
120
121 let mut builder = crate::SchemaBuilder::new(fbb);
122 builder.add_fields(fb_field_list);
123 if let Some(fb_metadata_list) = fb_metadata_list {
124 builder.add_custom_metadata(fb_metadata_list);
125 }
126 builder.finish()
127 }
128}
129
130#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")]
132pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> {
133 IpcSchemaEncoder::new().schema_to_fb(schema)
134}
135
136pub fn metadata_to_fb<'a>(
138 fbb: &mut FlatBufferBuilder<'a>,
139 metadata: &HashMap<String, String>,
140) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
141 let custom_metadata = metadata
142 .iter()
143 .map(|(k, v)| {
144 let fb_key_name = fbb.create_string(k);
145 let fb_val_name = fbb.create_string(v);
146
147 let mut kv_builder = crate::KeyValueBuilder::new(fbb);
148 kv_builder.add_key(fb_key_name);
149 kv_builder.add_value(fb_val_name);
150 kv_builder.finish()
151 })
152 .collect::<Vec<_>>();
153 fbb.create_vector(&custom_metadata)
154}
155
156pub fn schema_to_fb_offset<'a>(
158 fbb: &mut FlatBufferBuilder<'a>,
159 schema: &Schema,
160) -> WIPOffset<crate::Schema<'a>> {
161 IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
162}
163
164impl From<crate::Field<'_>> for Field {
166 fn from(field: crate::Field) -> Field {
167 let arrow_field = if let Some(dictionary) = field.dictionary() {
168 #[allow(deprecated)]
169 Field::new_dict(
170 field.name().unwrap(),
171 get_data_type(field, true),
172 field.nullable(),
173 dictionary.id(),
174 dictionary.isOrdered(),
175 )
176 } else {
177 Field::new(
178 field.name().unwrap(),
179 get_data_type(field, true),
180 field.nullable(),
181 )
182 };
183
184 let mut metadata_map = HashMap::default();
185 if let Some(list) = field.custom_metadata() {
186 for kv in list {
187 if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
188 metadata_map.insert(k.to_string(), v.to_string());
189 }
190 }
191 }
192
193 arrow_field.with_metadata(metadata_map)
194 }
195}
196
197pub fn fb_to_schema(fb: crate::Schema) -> Schema {
199 let mut fields: Vec<Field> = vec![];
200 let c_fields = fb.fields().unwrap();
201 let len = c_fields.len();
202 for i in 0..len {
203 let c_field: crate::Field = c_fields.get(i);
204 match c_field.type_type() {
205 crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
206 unimplemented!("Big Endian is not supported for Decimal!")
207 }
208 _ => (),
209 };
210 fields.push(c_field.into());
211 }
212
213 let mut metadata: HashMap<String, String> = HashMap::default();
214 if let Some(md_fields) = fb.custom_metadata() {
215 let len = md_fields.len();
216 for i in 0..len {
217 let kv = md_fields.get(i);
218 let k_str = kv.key();
219 let v_str = kv.value();
220 if let Some(k) = k_str {
221 if let Some(v) = v_str {
222 metadata.insert(k.to_string(), v.to_string());
223 }
224 }
225 }
226 }
227 Schema::new_with_metadata(fields, metadata)
228}
229
230pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
232 if let Ok(ipc) = crate::root_as_message(bytes) {
233 if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
234 Ok(schema)
235 } else {
236 Err(ArrowError::ParseError(
237 "Unable to get head as schema".to_string(),
238 ))
239 }
240 } else {
241 Err(ArrowError::ParseError(
242 "Unable to get root as message".to_string(),
243 ))
244 }
245}
246
247pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
249 if buffer.len() >= 4 {
259 let continuation_marker = &buffer[0..4];
261 let begin_offset: usize = if continuation_marker.eq(&CONTINUATION_MARKER) {
262 4
266 } else {
267 0
271 };
272 let msg = size_prefixed_root_as_message(&buffer[begin_offset..]).map_err(|err| {
273 ArrowError::ParseError(format!("Unable to convert flight info to a message: {err}"))
274 })?;
275 let ipc_schema = msg.header_as_schema().ok_or_else(|| {
276 ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
277 })?;
278 Ok(fb_to_schema(ipc_schema))
279 } else {
280 Err(ArrowError::ParseError(
281 "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
282 ))
283 }
284}
285
286pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
288 if let Some(dictionary) = field.dictionary() {
289 if may_be_dictionary {
290 let int = dictionary.indexType().unwrap();
291 let index_type = match (int.bitWidth(), int.is_signed()) {
292 (8, true) => DataType::Int8,
293 (8, false) => DataType::UInt8,
294 (16, true) => DataType::Int16,
295 (16, false) => DataType::UInt16,
296 (32, true) => DataType::Int32,
297 (32, false) => DataType::UInt32,
298 (64, true) => DataType::Int64,
299 (64, false) => DataType::UInt64,
300 _ => panic!("Unexpected bitwidth and signed"),
301 };
302 return DataType::Dictionary(
303 Box::new(index_type),
304 Box::new(get_data_type(field, false)),
305 );
306 }
307 }
308
309 match field.type_type() {
310 crate::Type::Null => DataType::Null,
311 crate::Type::Bool => DataType::Boolean,
312 crate::Type::Int => {
313 let int = field.type_as_int().unwrap();
314 match (int.bitWidth(), int.is_signed()) {
315 (8, true) => DataType::Int8,
316 (8, false) => DataType::UInt8,
317 (16, true) => DataType::Int16,
318 (16, false) => DataType::UInt16,
319 (32, true) => DataType::Int32,
320 (32, false) => DataType::UInt32,
321 (64, true) => DataType::Int64,
322 (64, false) => DataType::UInt64,
323 z => panic!(
324 "Int type with bit width of {} and signed of {} not supported",
325 z.0, z.1
326 ),
327 }
328 }
329 crate::Type::Binary => DataType::Binary,
330 crate::Type::BinaryView => DataType::BinaryView,
331 crate::Type::LargeBinary => DataType::LargeBinary,
332 crate::Type::Utf8 => DataType::Utf8,
333 crate::Type::Utf8View => DataType::Utf8View,
334 crate::Type::LargeUtf8 => DataType::LargeUtf8,
335 crate::Type::FixedSizeBinary => {
336 let fsb = field.type_as_fixed_size_binary().unwrap();
337 DataType::FixedSizeBinary(fsb.byteWidth())
338 }
339 crate::Type::FloatingPoint => {
340 let float = field.type_as_floating_point().unwrap();
341 match float.precision() {
342 crate::Precision::HALF => DataType::Float16,
343 crate::Precision::SINGLE => DataType::Float32,
344 crate::Precision::DOUBLE => DataType::Float64,
345 z => panic!("FloatingPoint type with precision of {z:?} not supported"),
346 }
347 }
348 crate::Type::Date => {
349 let date = field.type_as_date().unwrap();
350 match date.unit() {
351 crate::DateUnit::DAY => DataType::Date32,
352 crate::DateUnit::MILLISECOND => DataType::Date64,
353 z => panic!("Date type with unit of {z:?} not supported"),
354 }
355 }
356 crate::Type::Time => {
357 let time = field.type_as_time().unwrap();
358 match (time.bitWidth(), time.unit()) {
359 (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
360 (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
361 (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
362 (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
363 z => panic!(
364 "Time type with bit width of {} and unit of {:?} not supported",
365 z.0, z.1
366 ),
367 }
368 }
369 crate::Type::Timestamp => {
370 let timestamp = field.type_as_timestamp().unwrap();
371 let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
372 match timestamp.unit() {
373 crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
374 crate::TimeUnit::MILLISECOND => {
375 DataType::Timestamp(TimeUnit::Millisecond, timezone)
376 }
377 crate::TimeUnit::MICROSECOND => {
378 DataType::Timestamp(TimeUnit::Microsecond, timezone)
379 }
380 crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
381 z => panic!("Timestamp type with unit of {z:?} not supported"),
382 }
383 }
384 crate::Type::Interval => {
385 let interval = field.type_as_interval().unwrap();
386 match interval.unit() {
387 crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
388 crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
389 crate::IntervalUnit::MONTH_DAY_NANO => {
390 DataType::Interval(IntervalUnit::MonthDayNano)
391 }
392 z => panic!("Interval type with unit of {z:?} unsupported"),
393 }
394 }
395 crate::Type::Duration => {
396 let duration = field.type_as_duration().unwrap();
397 match duration.unit() {
398 crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
399 crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
400 crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
401 crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
402 z => panic!("Duration type with unit of {z:?} unsupported"),
403 }
404 }
405 crate::Type::List => {
406 let children = field.children().unwrap();
407 if children.len() != 1 {
408 panic!("expect a list to have one child")
409 }
410 DataType::List(Arc::new(children.get(0).into()))
411 }
412 crate::Type::LargeList => {
413 let children = field.children().unwrap();
414 if children.len() != 1 {
415 panic!("expect a large list to have one child")
416 }
417 DataType::LargeList(Arc::new(children.get(0).into()))
418 }
419 crate::Type::FixedSizeList => {
420 let children = field.children().unwrap();
421 if children.len() != 1 {
422 panic!("expect a list to have one child")
423 }
424 let fsl = field.type_as_fixed_size_list().unwrap();
425 DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
426 }
427 crate::Type::Struct_ => {
428 let fields = match field.children() {
429 Some(children) => children.iter().map(Field::from).collect(),
430 None => Fields::empty(),
431 };
432 DataType::Struct(fields)
433 }
434 crate::Type::RunEndEncoded => {
435 let children = field.children().unwrap();
436 if children.len() != 2 {
437 panic!(
438 "RunEndEncoded type should have exactly two children. Found {}",
439 children.len()
440 )
441 }
442 let run_ends_field = children.get(0).into();
443 let values_field = children.get(1).into();
444 DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
445 }
446 crate::Type::Map => {
447 let map = field.type_as_map().unwrap();
448 let children = field.children().unwrap();
449 if children.len() != 1 {
450 panic!("expect a map to have one child")
451 }
452 DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
453 }
454 crate::Type::Decimal => {
455 let fsb = field.type_as_decimal().unwrap();
456 let bit_width = fsb.bitWidth();
457 let precision: u8 = fsb.precision().try_into().unwrap();
458 let scale: i8 = fsb.scale().try_into().unwrap();
459 match bit_width {
460 128 => DataType::Decimal128(precision, scale),
461 256 => DataType::Decimal256(precision, scale),
462 _ => panic!("Unexpected decimal bit width {bit_width}"),
463 }
464 }
465 crate::Type::Union => {
466 let union = field.type_as_union().unwrap();
467
468 let union_mode = match union.mode() {
469 crate::UnionMode::Dense => UnionMode::Dense,
470 crate::UnionMode::Sparse => UnionMode::Sparse,
471 mode => panic!("Unexpected union mode: {mode:?}"),
472 };
473
474 let mut fields = vec![];
475 if let Some(children) = field.children() {
476 for i in 0..children.len() {
477 fields.push(Field::from(children.get(i)));
478 }
479 };
480
481 let fields = match union.typeIds() {
482 None => UnionFields::new(0_i8..fields.len() as i8, fields),
483 Some(ids) => UnionFields::new(ids.iter().map(|i| i as i8), fields),
484 };
485
486 DataType::Union(fields, union_mode)
487 }
488 t => unimplemented!("Type {:?} not supported", t),
489 }
490}
491
492pub(crate) struct FBFieldType<'b> {
493 pub(crate) type_type: crate::Type,
494 pub(crate) type_: WIPOffset<UnionWIPOffset>,
495 pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
496}
497
498pub(crate) fn build_field<'a>(
500 fbb: &mut FlatBufferBuilder<'a>,
501 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
502 field: &Field,
503) -> WIPOffset<crate::Field<'a>> {
504 let mut fb_metadata = None;
506 if !field.metadata().is_empty() {
507 fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
508 };
509
510 let fb_field_name = fbb.create_string(field.name().as_str());
511 let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
512
513 let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
514 match dictionary_tracker {
515 Some(tracker) => Some(get_fb_dictionary(
516 index_type,
517 #[allow(deprecated)]
518 tracker.set_dict_id(field),
519 field
520 .dict_is_ordered()
521 .expect("All Dictionary types have `dict_is_ordered`"),
522 fbb,
523 )),
524 None => Some(get_fb_dictionary(
525 index_type,
526 #[allow(deprecated)]
527 field
528 .dict_id()
529 .expect("Dictionary type must have a dictionary id"),
530 field
531 .dict_is_ordered()
532 .expect("All Dictionary types have `dict_is_ordered`"),
533 fbb,
534 )),
535 }
536 } else {
537 None
538 };
539
540 let mut field_builder = crate::FieldBuilder::new(fbb);
541 field_builder.add_name(fb_field_name);
542 if let Some(dictionary) = fb_dictionary {
543 field_builder.add_dictionary(dictionary)
544 }
545 field_builder.add_type_type(field_type.type_type);
546 field_builder.add_nullable(field.is_nullable());
547 match field_type.children {
548 None => {}
549 Some(children) => field_builder.add_children(children),
550 };
551 field_builder.add_type_(field_type.type_);
552
553 if let Some(fb_metadata) = fb_metadata {
554 field_builder.add_custom_metadata(fb_metadata);
555 }
556
557 field_builder.finish()
558}
559
560pub(crate) fn get_fb_field_type<'a>(
562 data_type: &DataType,
563 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
564 fbb: &mut FlatBufferBuilder<'a>,
565) -> FBFieldType<'a> {
566 let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
569 match data_type {
570 Null => FBFieldType {
571 type_type: crate::Type::Null,
572 type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
573 children: Some(fbb.create_vector(&empty_fields[..])),
574 },
575 Boolean => FBFieldType {
576 type_type: crate::Type::Bool,
577 type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
578 children: Some(fbb.create_vector(&empty_fields[..])),
579 },
580 UInt8 | UInt16 | UInt32 | UInt64 => {
581 let children = fbb.create_vector(&empty_fields[..]);
582 let mut builder = crate::IntBuilder::new(fbb);
583 builder.add_is_signed(false);
584 match data_type {
585 UInt8 => builder.add_bitWidth(8),
586 UInt16 => builder.add_bitWidth(16),
587 UInt32 => builder.add_bitWidth(32),
588 UInt64 => builder.add_bitWidth(64),
589 _ => {}
590 };
591 FBFieldType {
592 type_type: crate::Type::Int,
593 type_: builder.finish().as_union_value(),
594 children: Some(children),
595 }
596 }
597 Int8 | Int16 | Int32 | Int64 => {
598 let children = fbb.create_vector(&empty_fields[..]);
599 let mut builder = crate::IntBuilder::new(fbb);
600 builder.add_is_signed(true);
601 match data_type {
602 Int8 => builder.add_bitWidth(8),
603 Int16 => builder.add_bitWidth(16),
604 Int32 => builder.add_bitWidth(32),
605 Int64 => builder.add_bitWidth(64),
606 _ => {}
607 };
608 FBFieldType {
609 type_type: crate::Type::Int,
610 type_: builder.finish().as_union_value(),
611 children: Some(children),
612 }
613 }
614 Float16 | Float32 | Float64 => {
615 let children = fbb.create_vector(&empty_fields[..]);
616 let mut builder = crate::FloatingPointBuilder::new(fbb);
617 match data_type {
618 Float16 => builder.add_precision(crate::Precision::HALF),
619 Float32 => builder.add_precision(crate::Precision::SINGLE),
620 Float64 => builder.add_precision(crate::Precision::DOUBLE),
621 _ => {}
622 };
623 FBFieldType {
624 type_type: crate::Type::FloatingPoint,
625 type_: builder.finish().as_union_value(),
626 children: Some(children),
627 }
628 }
629 Binary => FBFieldType {
630 type_type: crate::Type::Binary,
631 type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
632 children: Some(fbb.create_vector(&empty_fields[..])),
633 },
634 LargeBinary => FBFieldType {
635 type_type: crate::Type::LargeBinary,
636 type_: crate::LargeBinaryBuilder::new(fbb)
637 .finish()
638 .as_union_value(),
639 children: Some(fbb.create_vector(&empty_fields[..])),
640 },
641 BinaryView => FBFieldType {
642 type_type: crate::Type::BinaryView,
643 type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
644 children: Some(fbb.create_vector(&empty_fields[..])),
645 },
646 Utf8View => FBFieldType {
647 type_type: crate::Type::Utf8View,
648 type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
649 children: Some(fbb.create_vector(&empty_fields[..])),
650 },
651 Utf8 => FBFieldType {
652 type_type: crate::Type::Utf8,
653 type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
654 children: Some(fbb.create_vector(&empty_fields[..])),
655 },
656 LargeUtf8 => FBFieldType {
657 type_type: crate::Type::LargeUtf8,
658 type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
659 children: Some(fbb.create_vector(&empty_fields[..])),
660 },
661 FixedSizeBinary(len) => {
662 let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
663 builder.add_byteWidth(*len);
664 FBFieldType {
665 type_type: crate::Type::FixedSizeBinary,
666 type_: builder.finish().as_union_value(),
667 children: Some(fbb.create_vector(&empty_fields[..])),
668 }
669 }
670 Date32 => {
671 let mut builder = crate::DateBuilder::new(fbb);
672 builder.add_unit(crate::DateUnit::DAY);
673 FBFieldType {
674 type_type: crate::Type::Date,
675 type_: builder.finish().as_union_value(),
676 children: Some(fbb.create_vector(&empty_fields[..])),
677 }
678 }
679 Date64 => {
680 let mut builder = crate::DateBuilder::new(fbb);
681 builder.add_unit(crate::DateUnit::MILLISECOND);
682 FBFieldType {
683 type_type: crate::Type::Date,
684 type_: builder.finish().as_union_value(),
685 children: Some(fbb.create_vector(&empty_fields[..])),
686 }
687 }
688 Time32(unit) | Time64(unit) => {
689 let mut builder = crate::TimeBuilder::new(fbb);
690 match unit {
691 TimeUnit::Second => {
692 builder.add_bitWidth(32);
693 builder.add_unit(crate::TimeUnit::SECOND);
694 }
695 TimeUnit::Millisecond => {
696 builder.add_bitWidth(32);
697 builder.add_unit(crate::TimeUnit::MILLISECOND);
698 }
699 TimeUnit::Microsecond => {
700 builder.add_bitWidth(64);
701 builder.add_unit(crate::TimeUnit::MICROSECOND);
702 }
703 TimeUnit::Nanosecond => {
704 builder.add_bitWidth(64);
705 builder.add_unit(crate::TimeUnit::NANOSECOND);
706 }
707 }
708 FBFieldType {
709 type_type: crate::Type::Time,
710 type_: builder.finish().as_union_value(),
711 children: Some(fbb.create_vector(&empty_fields[..])),
712 }
713 }
714 Timestamp(unit, tz) => {
715 let tz = tz.as_deref().unwrap_or_default();
716 let tz_str = fbb.create_string(tz);
717 let mut builder = crate::TimestampBuilder::new(fbb);
718 let time_unit = match unit {
719 TimeUnit::Second => crate::TimeUnit::SECOND,
720 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
721 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
722 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
723 };
724 builder.add_unit(time_unit);
725 if !tz.is_empty() {
726 builder.add_timezone(tz_str);
727 }
728 FBFieldType {
729 type_type: crate::Type::Timestamp,
730 type_: builder.finish().as_union_value(),
731 children: Some(fbb.create_vector(&empty_fields[..])),
732 }
733 }
734 Interval(unit) => {
735 let mut builder = crate::IntervalBuilder::new(fbb);
736 let interval_unit = match unit {
737 IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
738 IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
739 IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
740 };
741 builder.add_unit(interval_unit);
742 FBFieldType {
743 type_type: crate::Type::Interval,
744 type_: builder.finish().as_union_value(),
745 children: Some(fbb.create_vector(&empty_fields[..])),
746 }
747 }
748 Duration(unit) => {
749 let mut builder = crate::DurationBuilder::new(fbb);
750 let time_unit = match unit {
751 TimeUnit::Second => crate::TimeUnit::SECOND,
752 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
753 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
754 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
755 };
756 builder.add_unit(time_unit);
757 FBFieldType {
758 type_type: crate::Type::Duration,
759 type_: builder.finish().as_union_value(),
760 children: Some(fbb.create_vector(&empty_fields[..])),
761 }
762 }
763 List(ref list_type) => {
764 let child = build_field(fbb, dictionary_tracker, list_type);
765 FBFieldType {
766 type_type: crate::Type::List,
767 type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
768 children: Some(fbb.create_vector(&[child])),
769 }
770 }
771 ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
772 LargeList(ref list_type) => {
773 let child = build_field(fbb, dictionary_tracker, list_type);
774 FBFieldType {
775 type_type: crate::Type::LargeList,
776 type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
777 children: Some(fbb.create_vector(&[child])),
778 }
779 }
780 FixedSizeList(ref list_type, len) => {
781 let child = build_field(fbb, dictionary_tracker, list_type);
782 let mut builder = crate::FixedSizeListBuilder::new(fbb);
783 builder.add_listSize(*len);
784 FBFieldType {
785 type_type: crate::Type::FixedSizeList,
786 type_: builder.finish().as_union_value(),
787 children: Some(fbb.create_vector(&[child])),
788 }
789 }
790 Struct(fields) => {
791 let mut children = vec![];
793 for field in fields {
794 children.push(build_field(fbb, dictionary_tracker, field));
795 }
796 FBFieldType {
797 type_type: crate::Type::Struct_,
798 type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
799 children: Some(fbb.create_vector(&children[..])),
800 }
801 }
802 RunEndEncoded(run_ends, values) => {
803 let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
804 let values_field = build_field(fbb, dictionary_tracker, values);
805 let children = [run_ends_field, values_field];
806 FBFieldType {
807 type_type: crate::Type::RunEndEncoded,
808 type_: crate::RunEndEncodedBuilder::new(fbb)
809 .finish()
810 .as_union_value(),
811 children: Some(fbb.create_vector(&children[..])),
812 }
813 }
814 Map(map_field, keys_sorted) => {
815 let child = build_field(fbb, dictionary_tracker, map_field);
816 let mut field_type = crate::MapBuilder::new(fbb);
817 field_type.add_keysSorted(*keys_sorted);
818 FBFieldType {
819 type_type: crate::Type::Map,
820 type_: field_type.finish().as_union_value(),
821 children: Some(fbb.create_vector(&[child])),
822 }
823 }
824 Dictionary(_, value_type) => {
825 get_fb_field_type(value_type, dictionary_tracker, fbb)
829 }
830 Decimal128(precision, scale) => {
831 let mut builder = crate::DecimalBuilder::new(fbb);
832 builder.add_precision(*precision as i32);
833 builder.add_scale(*scale as i32);
834 builder.add_bitWidth(128);
835 FBFieldType {
836 type_type: crate::Type::Decimal,
837 type_: builder.finish().as_union_value(),
838 children: Some(fbb.create_vector(&empty_fields[..])),
839 }
840 }
841 Decimal256(precision, scale) => {
842 let mut builder = crate::DecimalBuilder::new(fbb);
843 builder.add_precision(*precision as i32);
844 builder.add_scale(*scale as i32);
845 builder.add_bitWidth(256);
846 FBFieldType {
847 type_type: crate::Type::Decimal,
848 type_: builder.finish().as_union_value(),
849 children: Some(fbb.create_vector(&empty_fields[..])),
850 }
851 }
852 Union(fields, mode) => {
853 let mut children = vec![];
854 for (_, field) in fields.iter() {
855 children.push(build_field(fbb, dictionary_tracker, field));
856 }
857
858 let union_mode = match mode {
859 UnionMode::Sparse => crate::UnionMode::Sparse,
860 UnionMode::Dense => crate::UnionMode::Dense,
861 };
862
863 let fbb_type_ids =
864 fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
865 let mut builder = crate::UnionBuilder::new(fbb);
866 builder.add_mode(union_mode);
867 builder.add_typeIds(fbb_type_ids);
868
869 FBFieldType {
870 type_type: crate::Type::Union,
871 type_: builder.finish().as_union_value(),
872 children: Some(fbb.create_vector(&children[..])),
873 }
874 }
875 }
876}
877
878pub(crate) fn get_fb_dictionary<'a>(
880 index_type: &DataType,
881 dict_id: i64,
882 dict_is_ordered: bool,
883 fbb: &mut FlatBufferBuilder<'a>,
884) -> WIPOffset<crate::DictionaryEncoding<'a>> {
885 let mut index_builder = crate::IntBuilder::new(fbb);
888
889 match *index_type {
890 Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
891 UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
892 _ => {}
893 }
894
895 match *index_type {
896 Int8 | UInt8 => index_builder.add_bitWidth(8),
897 Int16 | UInt16 => index_builder.add_bitWidth(16),
898 Int32 | UInt32 => index_builder.add_bitWidth(32),
899 Int64 | UInt64 => index_builder.add_bitWidth(64),
900 _ => {}
901 }
902
903 let index_builder = index_builder.finish();
904
905 let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
906 builder.add_id(dict_id);
907 builder.add_indexType(index_builder);
908 builder.add_isOrdered(dict_is_ordered);
909
910 builder.finish()
911}
912
913#[derive(Clone)]
925pub struct MessageBuffer(Buffer);
926
927impl Debug for MessageBuffer {
928 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
929 self.as_ref().fmt(f)
930 }
931}
932
933impl MessageBuffer {
934 pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
936 let opts = VerifierOptions::default();
937 let mut v = Verifier::new(&opts, &buf);
938 <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
939 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
940 })?;
941 Ok(Self(buf))
942 }
943
944 #[inline]
946 pub fn as_ref(&self) -> Message<'_> {
947 unsafe { crate::root_as_message_unchecked(&self.0) }
949 }
950}
951
952#[cfg(test)]
953mod tests {
954 use super::*;
955
956 #[test]
957 fn convert_schema_round_trip() {
958 let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
959 .iter()
960 .cloned()
961 .collect();
962 let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
963 .iter()
964 .cloned()
965 .collect();
966 let schema = Schema::new_with_metadata(
967 vec![
968 Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
969 Field::new("uint16", DataType::UInt16, true),
970 Field::new("uint32", DataType::UInt32, false),
971 Field::new("uint64", DataType::UInt64, true),
972 Field::new("int8", DataType::Int8, true),
973 Field::new("int16", DataType::Int16, false),
974 Field::new("int32", DataType::Int32, true),
975 Field::new("int64", DataType::Int64, false),
976 Field::new("float16", DataType::Float16, true),
977 Field::new("float32", DataType::Float32, false),
978 Field::new("float64", DataType::Float64, true),
979 Field::new("null", DataType::Null, false),
980 Field::new("bool", DataType::Boolean, false),
981 Field::new("date32", DataType::Date32, false),
982 Field::new("date64", DataType::Date64, true),
983 Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
984 Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
985 Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
986 Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
987 Field::new(
988 "timestamp[s]",
989 DataType::Timestamp(TimeUnit::Second, None),
990 false,
991 ),
992 Field::new(
993 "timestamp[ms]",
994 DataType::Timestamp(TimeUnit::Millisecond, None),
995 true,
996 ),
997 Field::new(
998 "timestamp[us]",
999 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1000 false,
1001 ),
1002 Field::new(
1003 "timestamp[ns]",
1004 DataType::Timestamp(TimeUnit::Nanosecond, None),
1005 true,
1006 ),
1007 Field::new(
1008 "interval[ym]",
1009 DataType::Interval(IntervalUnit::YearMonth),
1010 true,
1011 ),
1012 Field::new(
1013 "interval[dt]",
1014 DataType::Interval(IntervalUnit::DayTime),
1015 true,
1016 ),
1017 Field::new(
1018 "interval[mdn]",
1019 DataType::Interval(IntervalUnit::MonthDayNano),
1020 true,
1021 ),
1022 Field::new("utf8", DataType::Utf8, false),
1023 Field::new("utf8_view", DataType::Utf8View, false),
1024 Field::new("binary", DataType::Binary, false),
1025 Field::new("binary_view", DataType::BinaryView, false),
1026 Field::new_list(
1027 "list[u8]",
1028 Field::new_list_field(DataType::UInt8, false),
1029 true,
1030 ),
1031 Field::new_fixed_size_list(
1032 "fixed_size_list[u8]",
1033 Field::new_list_field(DataType::UInt8, false),
1034 2,
1035 true,
1036 ),
1037 Field::new_list(
1038 "list[struct<float32, int32, bool>]",
1039 Field::new_struct(
1040 "struct",
1041 vec![
1042 Field::new("float32", UInt8, false),
1043 Field::new("int32", Int32, true),
1044 Field::new("bool", Boolean, true),
1045 ],
1046 true,
1047 ),
1048 false,
1049 ),
1050 Field::new_struct(
1051 "struct<dictionary<int32, utf8>>",
1052 vec![Field::new(
1053 "dictionary<int32, utf8>",
1054 Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1055 false,
1056 )],
1057 false,
1058 ),
1059 Field::new_struct(
1060 "struct<int64, list[struct<date32, list[struct<>]>]>",
1061 vec![
1062 Field::new("int64", DataType::Int64, true),
1063 Field::new_list(
1064 "list[struct<date32, list[struct<>]>]",
1065 Field::new_struct(
1066 "struct",
1067 vec![
1068 Field::new("date32", DataType::Date32, true),
1069 Field::new_list(
1070 "list[struct<>]",
1071 Field::new(
1072 "struct",
1073 DataType::Struct(Fields::empty()),
1074 false,
1075 ),
1076 false,
1077 ),
1078 ],
1079 false,
1080 ),
1081 false,
1082 ),
1083 ],
1084 false,
1085 ),
1086 Field::new_union(
1087 "union<int64, list[union<date32, list[union<>]>]>",
1088 vec![0, 1],
1089 vec![
1090 Field::new("int64", DataType::Int64, true),
1091 Field::new_list(
1092 "list[union<date32, list[union<>]>]",
1093 Field::new_union(
1094 "union<date32, list[union<>]>",
1095 vec![0, 1],
1096 vec![
1097 Field::new("date32", DataType::Date32, true),
1098 Field::new_list(
1099 "list[union<>]",
1100 Field::new(
1101 "union",
1102 DataType::Union(
1103 UnionFields::empty(),
1104 UnionMode::Sparse,
1105 ),
1106 false,
1107 ),
1108 false,
1109 ),
1110 ],
1111 UnionMode::Dense,
1112 ),
1113 false,
1114 ),
1115 ],
1116 UnionMode::Sparse,
1117 ),
1118 Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1119 Field::new(
1120 "union<>",
1121 DataType::Union(UnionFields::empty(), UnionMode::Dense),
1122 true,
1123 ),
1124 Field::new(
1125 "union<>",
1126 DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1127 true,
1128 ),
1129 Field::new(
1130 "union<int32, utf8>",
1131 DataType::Union(
1132 UnionFields::new(
1133 vec![2, 3], vec![
1135 Field::new("int32", DataType::Int32, true),
1136 Field::new("utf8", DataType::Utf8, true),
1137 ],
1138 ),
1139 UnionMode::Dense,
1140 ),
1141 true,
1142 ),
1143 #[allow(deprecated)]
1144 Field::new_dict(
1145 "dictionary<int32, utf8>",
1146 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1147 true,
1148 123,
1149 true,
1150 ),
1151 #[allow(deprecated)]
1152 Field::new_dict(
1153 "dictionary<uint8, uint32>",
1154 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1155 true,
1156 123,
1157 true,
1158 ),
1159 Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1160 ],
1161 md,
1162 );
1163
1164 let mut dictionary_tracker = DictionaryTracker::new(true);
1165 let fb = IpcSchemaEncoder::new()
1166 .with_dictionary_tracker(&mut dictionary_tracker)
1167 .schema_to_fb(&schema);
1168
1169 let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1171 let schema2 = fb_to_schema(ipc);
1172 assert_eq!(schema, schema2);
1173 }
1174
1175 #[test]
1176 fn schema_from_bytes() {
1177 let bytes: Vec<u8> = vec![
1187 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1188 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1189 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1190 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1191 0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1192 ];
1193 let ipc = crate::root_as_message(&bytes).unwrap();
1194 let schema = ipc.header_as_schema().unwrap();
1195
1196 let data_gen = crate::writer::IpcDataGenerator::default();
1198 let mut dictionary_tracker = DictionaryTracker::new(true);
1199 let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1200 let bytes = data_gen
1201 .schema_to_bytes_with_dictionary_tracker(
1202 &arrow_schema,
1203 &mut dictionary_tracker,
1204 &crate::writer::IpcWriteOptions::default(),
1205 )
1206 .ipc_message;
1207
1208 let ipc2 = crate::root_as_message(&bytes).unwrap();
1209 let schema2 = ipc2.header_as_schema().unwrap();
1210
1211 assert!(schema.custom_metadata().is_none());
1213 assert!(schema2.custom_metadata().is_none());
1214 assert_eq!(schema.endianness(), schema2.endianness());
1215 assert!(schema.features().is_none());
1216 assert!(schema2.features().is_none());
1217 assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1218
1219 assert_eq!(ipc.version(), ipc2.version());
1220 assert_eq!(ipc.header_type(), ipc2.header_type());
1221 assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1222 assert!(ipc.custom_metadata().is_none());
1223 assert!(ipc2.custom_metadata().is_none());
1224 }
1225}