1use std::ops::Deref;
19use std::sync::Arc;
20
21use crate::{ArrowError, DataType, Field, FieldRef, SchemaBuilder};
22
23#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59#[cfg_attr(feature = "serde", serde(transparent))]
60pub struct Fields(Arc<[FieldRef]>);
61
62impl std::fmt::Debug for Fields {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 self.0.as_ref().fmt(f)
65 }
66}
67
68impl Fields {
69 pub fn empty() -> Self {
71 Self(Arc::new([]))
72 }
73
74 pub fn size(&self) -> usize {
76 self.iter()
77 .map(|field| field.size() + std::mem::size_of::<FieldRef>())
78 .sum()
79 }
80
81 pub fn find(&self, name: &str) -> Option<(usize, &FieldRef)> {
83 self.0.iter().enumerate().find(|(_, b)| b.name() == name)
84 }
85
86 pub fn contains(&self, other: &Fields) -> bool {
93 if Arc::ptr_eq(&self.0, &other.0) {
94 return true;
95 }
96 self.len() == other.len()
97 && self
98 .iter()
99 .zip(other.iter())
100 .all(|(a, b)| Arc::ptr_eq(a, b) || a.contains(b))
101 }
102
103 pub fn filter_leaves<F: FnMut(usize, &FieldRef) -> bool>(&self, mut filter: F) -> Self {
140 self.try_filter_leaves(|idx, field| Ok(filter(idx, field)))
141 .unwrap()
142 }
143
144 pub fn try_filter_leaves<F: FnMut(usize, &FieldRef) -> Result<bool, ArrowError>>(
149 &self,
150 mut filter: F,
151 ) -> Result<Self, ArrowError> {
152 fn filter_field<F: FnMut(&FieldRef) -> Result<bool, ArrowError>>(
153 f: &FieldRef,
154 filter: &mut F,
155 ) -> Result<Option<FieldRef>, ArrowError> {
156 use DataType::*;
157
158 let v = match f.data_type() {
159 Dictionary(_, v) => v.as_ref(), RunEndEncoded(_, v) => v.data_type(), d => d,
162 };
163 let d = match v {
164 List(child) => {
165 let fields = filter_field(child, filter)?;
166 if let Some(fields) = fields {
167 List(fields)
168 } else {
169 return Ok(None);
170 }
171 }
172 LargeList(child) => {
173 let fields = filter_field(child, filter)?;
174 if let Some(fields) = fields {
175 LargeList(fields)
176 } else {
177 return Ok(None);
178 }
179 }
180 Map(child, ordered) => {
181 let fields = filter_field(child, filter)?;
182 if let Some(fields) = fields {
183 Map(fields, *ordered)
184 } else {
185 return Ok(None);
186 }
187 }
188 FixedSizeList(child, size) => {
189 let fields = filter_field(child, filter)?;
190 if let Some(fields) = fields {
191 FixedSizeList(fields, *size)
192 } else {
193 return Ok(None);
194 }
195 }
196 Struct(fields) => {
197 let filtered: Result<Vec<_>, _> =
198 fields.iter().map(|f| filter_field(f, filter)).collect();
199 let filtered: Fields = filtered?
200 .iter()
201 .filter_map(|f| f.as_ref().cloned())
202 .collect();
203
204 if filtered.is_empty() {
205 return Ok(None);
206 }
207
208 Struct(filtered)
209 }
210 Union(fields, mode) => {
211 let filtered: Result<Vec<_>, _> = fields
212 .iter()
213 .map(|(id, f)| filter_field(f, filter).map(|f| f.map(|f| (id, f))))
214 .collect();
215 let filtered: UnionFields = filtered?
216 .iter()
217 .filter_map(|f| f.as_ref().cloned())
218 .collect();
219
220 if filtered.is_empty() {
221 return Ok(None);
222 }
223
224 Union(filtered, *mode)
225 }
226 _ => {
227 let filtered = filter(f)?;
228 return Ok(filtered.then(|| f.clone()));
229 }
230 };
231 let d = match f.data_type() {
232 Dictionary(k, _) => Dictionary(k.clone(), Box::new(d)),
233 RunEndEncoded(v, f) => {
234 RunEndEncoded(v.clone(), Arc::new(f.as_ref().clone().with_data_type(d)))
235 }
236 _ => d,
237 };
238 Ok(Some(Arc::new(f.as_ref().clone().with_data_type(d))))
239 }
240
241 let mut leaf_idx = 0;
242 let mut filter = |f: &FieldRef| {
243 let t = filter(leaf_idx, f)?;
244 leaf_idx += 1;
245 Ok(t)
246 };
247
248 let filtered: Result<Vec<_>, _> = self
249 .0
250 .iter()
251 .map(|f| filter_field(f, &mut filter))
252 .collect();
253 let filtered = filtered?
254 .iter()
255 .filter_map(|f| f.as_ref().cloned())
256 .collect();
257 Ok(filtered)
258 }
259
260 #[deprecated(note = "Use SchemaBuilder::remove")]
279 #[doc(hidden)]
280 pub fn remove(&mut self, index: usize) -> FieldRef {
281 let mut builder = SchemaBuilder::from(Fields::from(&*self.0));
282 let field = builder.remove(index);
283 *self = builder.finish().fields;
284 field
285 }
286}
287
288impl Default for Fields {
289 fn default() -> Self {
290 Self::empty()
291 }
292}
293
294impl FromIterator<Field> for Fields {
295 fn from_iter<T: IntoIterator<Item = Field>>(iter: T) -> Self {
296 iter.into_iter().map(Arc::new).collect()
297 }
298}
299
300impl FromIterator<FieldRef> for Fields {
301 fn from_iter<T: IntoIterator<Item = FieldRef>>(iter: T) -> Self {
302 Self(iter.into_iter().collect())
303 }
304}
305
306impl From<Vec<Field>> for Fields {
307 fn from(value: Vec<Field>) -> Self {
308 value.into_iter().collect()
309 }
310}
311
312impl From<Vec<FieldRef>> for Fields {
313 fn from(value: Vec<FieldRef>) -> Self {
314 Self(value.into())
315 }
316}
317
318impl From<&[FieldRef]> for Fields {
319 fn from(value: &[FieldRef]) -> Self {
320 Self(value.into())
321 }
322}
323
324impl<const N: usize> From<[FieldRef; N]> for Fields {
325 fn from(value: [FieldRef; N]) -> Self {
326 Self(Arc::new(value))
327 }
328}
329
330impl Deref for Fields {
331 type Target = [FieldRef];
332
333 fn deref(&self) -> &Self::Target {
334 self.0.as_ref()
335 }
336}
337
338impl<'a> IntoIterator for &'a Fields {
339 type Item = &'a FieldRef;
340 type IntoIter = std::slice::Iter<'a, FieldRef>;
341
342 fn into_iter(self) -> Self::IntoIter {
343 self.0.iter()
344 }
345}
346
347#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
349#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
350#[cfg_attr(feature = "serde", serde(transparent))]
351pub struct UnionFields(Arc<[(i8, FieldRef)]>);
352
353impl std::fmt::Debug for UnionFields {
354 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355 self.0.as_ref().fmt(f)
356 }
357}
358
359impl UnionFields {
360 pub fn empty() -> Self {
362 Self(Arc::from([]))
363 }
364
365 pub fn new<F, T>(type_ids: T, fields: F) -> Self
383 where
384 F: IntoIterator,
385 F::Item: Into<FieldRef>,
386 T: IntoIterator<Item = i8>,
387 {
388 let fields = fields.into_iter().map(Into::into);
389 let mut set = 0_u128;
390 type_ids
391 .into_iter()
392 .inspect(|&idx| {
393 let mask = 1_u128 << idx;
394 if (set & mask) != 0 {
395 panic!("duplicate type id: {}", idx);
396 } else {
397 set |= mask;
398 }
399 })
400 .zip(fields)
401 .collect()
402 }
403
404 pub fn size(&self) -> usize {
406 self.iter()
407 .map(|(_, field)| field.size() + std::mem::size_of::<(i8, FieldRef)>())
408 .sum()
409 }
410
411 pub fn len(&self) -> usize {
413 self.0.len()
414 }
415
416 pub fn is_empty(&self) -> bool {
418 self.0.is_empty()
419 }
420
421 pub fn iter(&self) -> impl Iterator<Item = (i8, &FieldRef)> + '_ {
423 self.0.iter().map(|(id, f)| (*id, f))
424 }
425
426 pub(crate) fn try_merge(&mut self, other: &Self) -> Result<(), ArrowError> {
430 let mut output: Vec<_> = self.iter().map(|(id, f)| (id, f.clone())).collect();
432 for (field_type_id, from_field) in other.iter() {
433 let mut is_new_field = true;
434 for (self_type_id, self_field) in output.iter_mut() {
435 if from_field == self_field {
436 if *self_type_id != field_type_id {
439 return Err(ArrowError::SchemaError(
440 format!("Fail to merge schema field '{}' because the self_type_id = {} does not equal field_type_id = {}",
441 self_field.name(), self_type_id, field_type_id)
442 ));
443 }
444
445 is_new_field = false;
446 break;
447 }
448 }
449
450 if is_new_field {
451 output.push((field_type_id, from_field.clone()))
452 }
453 }
454 *self = output.into_iter().collect();
455 Ok(())
456 }
457}
458
459impl FromIterator<(i8, FieldRef)> for UnionFields {
460 fn from_iter<T: IntoIterator<Item = (i8, FieldRef)>>(iter: T) -> Self {
461 Self(iter.into_iter().collect())
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469 use crate::UnionMode;
470
471 #[test]
472 fn test_filter() {
473 let floats = Fields::from(vec![
474 Field::new("a", DataType::Float32, false),
475 Field::new("b", DataType::Float32, false),
476 ]);
477 let fields = Fields::from(vec![
478 Field::new("a", DataType::Int32, true),
479 Field::new("floats", DataType::Struct(floats.clone()), true),
480 Field::new("b", DataType::Int16, true),
481 Field::new(
482 "c",
483 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
484 false,
485 ),
486 Field::new(
487 "d",
488 DataType::Dictionary(
489 Box::new(DataType::Int32),
490 Box::new(DataType::Struct(floats.clone())),
491 ),
492 false,
493 ),
494 Field::new_list(
495 "e",
496 Field::new("floats", DataType::Struct(floats.clone()), true),
497 true,
498 ),
499 Field::new_fixed_size_list("f", Field::new("item", DataType::Int32, false), 3, false),
500 Field::new_map(
501 "g",
502 "entries",
503 Field::new("keys", DataType::LargeUtf8, false),
504 Field::new("values", DataType::Int32, true),
505 false,
506 false,
507 ),
508 Field::new(
509 "h",
510 DataType::Union(
511 UnionFields::new(
512 vec![1, 3],
513 vec![
514 Field::new("field1", DataType::UInt8, false),
515 Field::new("field3", DataType::Utf8, false),
516 ],
517 ),
518 UnionMode::Dense,
519 ),
520 true,
521 ),
522 Field::new(
523 "i",
524 DataType::RunEndEncoded(
525 Arc::new(Field::new("run_ends", DataType::Int32, false)),
526 Arc::new(Field::new("values", DataType::Struct(floats.clone()), true)),
527 ),
528 false,
529 ),
530 ]);
531
532 let floats_a = DataType::Struct(vec![floats[0].clone()].into());
533
534 let r = fields.filter_leaves(|idx, _| idx == 0 || idx == 1);
535 assert_eq!(r.len(), 2);
536 assert_eq!(r[0], fields[0]);
537 assert_eq!(r[1].data_type(), &floats_a);
538
539 let r = fields.filter_leaves(|_, f| f.name() == "a");
540 assert_eq!(r.len(), 5);
541 assert_eq!(r[0], fields[0]);
542 assert_eq!(r[1].data_type(), &floats_a);
543 assert_eq!(
544 r[2].data_type(),
545 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(floats_a.clone()))
546 );
547 assert_eq!(
548 r[3].as_ref(),
549 &Field::new_list("e", Field::new("floats", floats_a.clone(), true), true)
550 );
551 assert_eq!(
552 r[4].as_ref(),
553 &Field::new(
554 "i",
555 DataType::RunEndEncoded(
556 Arc::new(Field::new("run_ends", DataType::Int32, false)),
557 Arc::new(Field::new("values", floats_a.clone(), true)),
558 ),
559 false,
560 )
561 );
562
563 let r = fields.filter_leaves(|_, f| f.name() == "floats");
564 assert_eq!(r.len(), 0);
565
566 let r = fields.filter_leaves(|idx, _| idx == 9);
567 assert_eq!(r.len(), 1);
568 assert_eq!(r[0], fields[6]);
569
570 let r = fields.filter_leaves(|idx, _| idx == 10 || idx == 11);
571 assert_eq!(r.len(), 1);
572 assert_eq!(r[0], fields[7]);
573
574 let union = DataType::Union(
575 UnionFields::new(vec![1], vec![Field::new("field1", DataType::UInt8, false)]),
576 UnionMode::Dense,
577 );
578
579 let r = fields.filter_leaves(|idx, _| idx == 12);
580 assert_eq!(r.len(), 1);
581 assert_eq!(r[0].data_type(), &union);
582
583 let r = fields.filter_leaves(|idx, _| idx == 14 || idx == 15);
584 assert_eq!(r.len(), 1);
585 assert_eq!(r[0], fields[9]);
586
587 let r = fields.try_filter_leaves(|_, _| Err(ArrowError::SchemaError("error".to_string())));
589 assert!(r.is_err());
590 }
591}