arrow_schema/
fields.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Deref;
19use std::sync::Arc;
20
21use crate::{ArrowError, DataType, Field, FieldRef, SchemaBuilder};
22
23/// A cheaply cloneable, owned slice of [`FieldRef`]
24///
25/// Similar to `Arc<Vec<FieldRef>>` or `Arc<[FieldRef]>`
26///
27/// Can be constructed in a number of ways
28///
29/// ```
30/// # use std::sync::Arc;
31/// # use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
32/// // Can be constructed from Vec<Field>
33/// Fields::from(vec![Field::new("a", DataType::Boolean, false)]);
34/// // Can be constructed from Vec<FieldRef>
35/// Fields::from(vec![Arc::new(Field::new("a", DataType::Boolean, false))]);
36/// // Can be constructed from an iterator of Field
37/// std::iter::once(Field::new("a", DataType::Boolean, false)).collect::<Fields>();
38/// // Can be constructed from an iterator of FieldRef
39/// std::iter::once(Arc::new(Field::new("a", DataType::Boolean, false))).collect::<Fields>();
40/// ```
41///
42/// See [`SchemaBuilder`] for mutating or updating [`Fields`]
43///
44/// ```
45/// # use arrow_schema::{DataType, Field, SchemaBuilder};
46/// let mut builder = SchemaBuilder::new();
47/// builder.push(Field::new("a", DataType::Boolean, false));
48/// builder.push(Field::new("b", DataType::Boolean, false));
49/// let fields = builder.finish().fields;
50///
51/// let mut builder = SchemaBuilder::from(&fields);
52/// builder.remove(0);
53/// let new = builder.finish().fields;
54/// ```
55///
56/// [`SchemaBuilder`]: crate::SchemaBuilder
57#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59#[cfg_attr(feature = "serde", serde(transparent))]
60pub struct Fields(Arc<[FieldRef]>);
61
62impl std::fmt::Debug for Fields {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        self.0.as_ref().fmt(f)
65    }
66}
67
68impl Fields {
69    /// Returns a new empty [`Fields`]
70    pub fn empty() -> Self {
71        Self(Arc::new([]))
72    }
73
74    /// Return size of this instance in bytes.
75    pub fn size(&self) -> usize {
76        self.iter()
77            .map(|field| field.size() + std::mem::size_of::<FieldRef>())
78            .sum()
79    }
80
81    /// Searches for a field by name, returning it along with its index if found
82    pub fn find(&self, name: &str) -> Option<(usize, &FieldRef)> {
83        self.0.iter().enumerate().find(|(_, b)| b.name() == name)
84    }
85
86    /// Check to see if `self` is a superset of `other`
87    ///
88    /// In particular returns true if both have the same number of fields, and [`Field::contains`]
89    /// for each field across self and other
90    ///
91    /// In other words, any record that conforms to `other` should also conform to `self`
92    pub fn contains(&self, other: &Fields) -> bool {
93        if Arc::ptr_eq(&self.0, &other.0) {
94            return true;
95        }
96        self.len() == other.len()
97            && self
98                .iter()
99                .zip(other.iter())
100                .all(|(a, b)| Arc::ptr_eq(a, b) || a.contains(b))
101    }
102
103    /// Returns a copy of this [`Fields`] containing only those [`FieldRef`] passing a predicate
104    ///
105    /// Performs a depth-first scan of [`Fields`] invoking `filter` for each [`FieldRef`]
106    /// containing no child [`FieldRef`], a leaf field, along with a count of the number
107    /// of such leaves encountered so far. Only [`FieldRef`] for which `filter`
108    /// returned `true` will be included in the result.
109    ///
110    /// This can therefore be used to select a subset of fields from nested types
111    /// such as [`DataType::Struct`] or [`DataType::List`].
112    ///
113    /// ```
114    /// # use arrow_schema::{DataType, Field, Fields};
115    /// let fields = Fields::from(vec![
116    ///     Field::new("a", DataType::Int32, true), // Leaf 0
117    ///     Field::new("b", DataType::Struct(Fields::from(vec![
118    ///         Field::new("c", DataType::Float32, false), // Leaf 1
119    ///         Field::new("d", DataType::Float64, false), // Leaf 2
120    ///         Field::new("e", DataType::Struct(Fields::from(vec![
121    ///             Field::new("f", DataType::Int32, false),   // Leaf 3
122    ///             Field::new("g", DataType::Float16, false), // Leaf 4
123    ///         ])), true),
124    ///     ])), false)
125    /// ]);
126    /// let filtered = fields.filter_leaves(|idx, _| [0, 2, 3, 4].contains(&idx));
127    /// let expected = Fields::from(vec![
128    ///     Field::new("a", DataType::Int32, true),
129    ///     Field::new("b", DataType::Struct(Fields::from(vec![
130    ///         Field::new("d", DataType::Float64, false),
131    ///         Field::new("e", DataType::Struct(Fields::from(vec![
132    ///             Field::new("f", DataType::Int32, false),
133    ///             Field::new("g", DataType::Float16, false),
134    ///         ])), true),
135    ///     ])), false)
136    /// ]);
137    /// assert_eq!(filtered, expected);
138    /// ```
139    pub fn filter_leaves<F: FnMut(usize, &FieldRef) -> bool>(&self, mut filter: F) -> Self {
140        self.try_filter_leaves(|idx, field| Ok(filter(idx, field)))
141            .unwrap()
142    }
143
144    /// Returns a copy of this [`Fields`] containing only those [`FieldRef`] passing a predicate
145    /// or an error if the predicate fails.
146    ///
147    /// See [`Fields::filter_leaves`] for more information.
148    pub fn try_filter_leaves<F: FnMut(usize, &FieldRef) -> Result<bool, ArrowError>>(
149        &self,
150        mut filter: F,
151    ) -> Result<Self, ArrowError> {
152        fn filter_field<F: FnMut(&FieldRef) -> Result<bool, ArrowError>>(
153            f: &FieldRef,
154            filter: &mut F,
155        ) -> Result<Option<FieldRef>, ArrowError> {
156            use DataType::*;
157
158            let v = match f.data_type() {
159                Dictionary(_, v) => v.as_ref(),       // Key must be integer
160                RunEndEncoded(_, v) => v.data_type(), // Run-ends must be integer
161                d => d,
162            };
163            let d = match v {
164                List(child) => {
165                    let fields = filter_field(child, filter)?;
166                    if let Some(fields) = fields {
167                        List(fields)
168                    } else {
169                        return Ok(None);
170                    }
171                }
172                LargeList(child) => {
173                    let fields = filter_field(child, filter)?;
174                    if let Some(fields) = fields {
175                        LargeList(fields)
176                    } else {
177                        return Ok(None);
178                    }
179                }
180                Map(child, ordered) => {
181                    let fields = filter_field(child, filter)?;
182                    if let Some(fields) = fields {
183                        Map(fields, *ordered)
184                    } else {
185                        return Ok(None);
186                    }
187                }
188                FixedSizeList(child, size) => {
189                    let fields = filter_field(child, filter)?;
190                    if let Some(fields) = fields {
191                        FixedSizeList(fields, *size)
192                    } else {
193                        return Ok(None);
194                    }
195                }
196                Struct(fields) => {
197                    let filtered: Result<Vec<_>, _> =
198                        fields.iter().map(|f| filter_field(f, filter)).collect();
199                    let filtered: Fields = filtered?
200                        .iter()
201                        .filter_map(|f| f.as_ref().cloned())
202                        .collect();
203
204                    if filtered.is_empty() {
205                        return Ok(None);
206                    }
207
208                    Struct(filtered)
209                }
210                Union(fields, mode) => {
211                    let filtered: Result<Vec<_>, _> = fields
212                        .iter()
213                        .map(|(id, f)| filter_field(f, filter).map(|f| f.map(|f| (id, f))))
214                        .collect();
215                    let filtered: UnionFields = filtered?
216                        .iter()
217                        .filter_map(|f| f.as_ref().cloned())
218                        .collect();
219
220                    if filtered.is_empty() {
221                        return Ok(None);
222                    }
223
224                    Union(filtered, *mode)
225                }
226                _ => {
227                    let filtered = filter(f)?;
228                    return Ok(filtered.then(|| f.clone()));
229                }
230            };
231            let d = match f.data_type() {
232                Dictionary(k, _) => Dictionary(k.clone(), Box::new(d)),
233                RunEndEncoded(v, f) => {
234                    RunEndEncoded(v.clone(), Arc::new(f.as_ref().clone().with_data_type(d)))
235                }
236                _ => d,
237            };
238            Ok(Some(Arc::new(f.as_ref().clone().with_data_type(d))))
239        }
240
241        let mut leaf_idx = 0;
242        let mut filter = |f: &FieldRef| {
243            let t = filter(leaf_idx, f)?;
244            leaf_idx += 1;
245            Ok(t)
246        };
247
248        let filtered: Result<Vec<_>, _> = self
249            .0
250            .iter()
251            .map(|f| filter_field(f, &mut filter))
252            .collect();
253        let filtered = filtered?
254            .iter()
255            .filter_map(|f| f.as_ref().cloned())
256            .collect();
257        Ok(filtered)
258    }
259
260    /// Remove a field by index and return it.
261    ///
262    /// # Panic
263    ///
264    /// Panics if `index` is out of bounds.
265    ///
266    /// # Example
267    /// ```
268    /// use arrow_schema::{DataType, Field, Fields};
269    /// let mut fields = Fields::from(vec![
270    ///   Field::new("a", DataType::Boolean, false),
271    ///   Field::new("b", DataType::Int8, false),
272    ///   Field::new("c", DataType::Utf8, false),
273    /// ]);
274    /// assert_eq!(fields.len(), 3);
275    /// assert_eq!(fields.remove(1), Field::new("b", DataType::Int8, false).into());
276    /// assert_eq!(fields.len(), 2);
277    /// ```
278    #[deprecated(note = "Use SchemaBuilder::remove")]
279    #[doc(hidden)]
280    pub fn remove(&mut self, index: usize) -> FieldRef {
281        let mut builder = SchemaBuilder::from(Fields::from(&*self.0));
282        let field = builder.remove(index);
283        *self = builder.finish().fields;
284        field
285    }
286}
287
288impl Default for Fields {
289    fn default() -> Self {
290        Self::empty()
291    }
292}
293
294impl FromIterator<Field> for Fields {
295    fn from_iter<T: IntoIterator<Item = Field>>(iter: T) -> Self {
296        iter.into_iter().map(Arc::new).collect()
297    }
298}
299
300impl FromIterator<FieldRef> for Fields {
301    fn from_iter<T: IntoIterator<Item = FieldRef>>(iter: T) -> Self {
302        Self(iter.into_iter().collect())
303    }
304}
305
306impl From<Vec<Field>> for Fields {
307    fn from(value: Vec<Field>) -> Self {
308        value.into_iter().collect()
309    }
310}
311
312impl From<Vec<FieldRef>> for Fields {
313    fn from(value: Vec<FieldRef>) -> Self {
314        Self(value.into())
315    }
316}
317
318impl From<&[FieldRef]> for Fields {
319    fn from(value: &[FieldRef]) -> Self {
320        Self(value.into())
321    }
322}
323
324impl<const N: usize> From<[FieldRef; N]> for Fields {
325    fn from(value: [FieldRef; N]) -> Self {
326        Self(Arc::new(value))
327    }
328}
329
330impl Deref for Fields {
331    type Target = [FieldRef];
332
333    fn deref(&self) -> &Self::Target {
334        self.0.as_ref()
335    }
336}
337
338impl<'a> IntoIterator for &'a Fields {
339    type Item = &'a FieldRef;
340    type IntoIter = std::slice::Iter<'a, FieldRef>;
341
342    fn into_iter(self) -> Self::IntoIter {
343        self.0.iter()
344    }
345}
346
347/// A cheaply cloneable, owned collection of [`FieldRef`] and their corresponding type ids
348#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
349#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
350#[cfg_attr(feature = "serde", serde(transparent))]
351pub struct UnionFields(Arc<[(i8, FieldRef)]>);
352
353impl std::fmt::Debug for UnionFields {
354    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355        self.0.as_ref().fmt(f)
356    }
357}
358
359impl UnionFields {
360    /// Create a new [`UnionFields`] with no fields
361    pub fn empty() -> Self {
362        Self(Arc::from([]))
363    }
364
365    /// Create a new [`UnionFields`] from a [`Fields`] and array of type_ids
366    ///
367    /// See <https://arrow.apache.org/docs/format/Columnar.html#union-layout>
368    ///
369    /// ```
370    /// use arrow_schema::{DataType, Field, UnionFields};
371    /// // Create a new UnionFields with type id mapping
372    /// // 1 -> DataType::UInt8
373    /// // 3 -> DataType::Utf8
374    /// UnionFields::new(
375    ///     vec![1, 3],
376    ///     vec![
377    ///         Field::new("field1", DataType::UInt8, false),
378    ///         Field::new("field3", DataType::Utf8, false),
379    ///     ],
380    /// );
381    /// ```
382    pub fn new<F, T>(type_ids: T, fields: F) -> Self
383    where
384        F: IntoIterator,
385        F::Item: Into<FieldRef>,
386        T: IntoIterator<Item = i8>,
387    {
388        let fields = fields.into_iter().map(Into::into);
389        let mut set = 0_u128;
390        type_ids
391            .into_iter()
392            .inspect(|&idx| {
393                let mask = 1_u128 << idx;
394                if (set & mask) != 0 {
395                    panic!("duplicate type id: {}", idx);
396                } else {
397                    set |= mask;
398                }
399            })
400            .zip(fields)
401            .collect()
402    }
403
404    /// Return size of this instance in bytes.
405    pub fn size(&self) -> usize {
406        self.iter()
407            .map(|(_, field)| field.size() + std::mem::size_of::<(i8, FieldRef)>())
408            .sum()
409    }
410
411    /// Returns the number of fields in this [`UnionFields`]
412    pub fn len(&self) -> usize {
413        self.0.len()
414    }
415
416    /// Returns `true` if this is empty
417    pub fn is_empty(&self) -> bool {
418        self.0.is_empty()
419    }
420
421    /// Returns an iterator over the fields and type ids in this [`UnionFields`]
422    pub fn iter(&self) -> impl Iterator<Item = (i8, &FieldRef)> + '_ {
423        self.0.iter().map(|(id, f)| (*id, f))
424    }
425
426    /// Merge this field into self if it is compatible.
427    ///
428    /// See [`Field::try_merge`]
429    pub(crate) fn try_merge(&mut self, other: &Self) -> Result<(), ArrowError> {
430        // TODO: This currently may produce duplicate type IDs (#3982)
431        let mut output: Vec<_> = self.iter().map(|(id, f)| (id, f.clone())).collect();
432        for (field_type_id, from_field) in other.iter() {
433            let mut is_new_field = true;
434            for (self_type_id, self_field) in output.iter_mut() {
435                if from_field == self_field {
436                    // If the nested fields in two unions are the same, they must have same
437                    // type id.
438                    if *self_type_id != field_type_id {
439                        return Err(ArrowError::SchemaError(
440                            format!("Fail to merge schema field '{}' because the self_type_id = {} does not equal field_type_id = {}",
441                                    self_field.name(), self_type_id, field_type_id)
442                        ));
443                    }
444
445                    is_new_field = false;
446                    break;
447                }
448            }
449
450            if is_new_field {
451                output.push((field_type_id, from_field.clone()))
452            }
453        }
454        *self = output.into_iter().collect();
455        Ok(())
456    }
457}
458
459impl FromIterator<(i8, FieldRef)> for UnionFields {
460    fn from_iter<T: IntoIterator<Item = (i8, FieldRef)>>(iter: T) -> Self {
461        // TODO: Should this validate type IDs are unique (#3982)
462        Self(iter.into_iter().collect())
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use crate::UnionMode;
470
471    #[test]
472    fn test_filter() {
473        let floats = Fields::from(vec![
474            Field::new("a", DataType::Float32, false),
475            Field::new("b", DataType::Float32, false),
476        ]);
477        let fields = Fields::from(vec![
478            Field::new("a", DataType::Int32, true),
479            Field::new("floats", DataType::Struct(floats.clone()), true),
480            Field::new("b", DataType::Int16, true),
481            Field::new(
482                "c",
483                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
484                false,
485            ),
486            Field::new(
487                "d",
488                DataType::Dictionary(
489                    Box::new(DataType::Int32),
490                    Box::new(DataType::Struct(floats.clone())),
491                ),
492                false,
493            ),
494            Field::new_list(
495                "e",
496                Field::new("floats", DataType::Struct(floats.clone()), true),
497                true,
498            ),
499            Field::new_fixed_size_list("f", Field::new("item", DataType::Int32, false), 3, false),
500            Field::new_map(
501                "g",
502                "entries",
503                Field::new("keys", DataType::LargeUtf8, false),
504                Field::new("values", DataType::Int32, true),
505                false,
506                false,
507            ),
508            Field::new(
509                "h",
510                DataType::Union(
511                    UnionFields::new(
512                        vec![1, 3],
513                        vec![
514                            Field::new("field1", DataType::UInt8, false),
515                            Field::new("field3", DataType::Utf8, false),
516                        ],
517                    ),
518                    UnionMode::Dense,
519                ),
520                true,
521            ),
522            Field::new(
523                "i",
524                DataType::RunEndEncoded(
525                    Arc::new(Field::new("run_ends", DataType::Int32, false)),
526                    Arc::new(Field::new("values", DataType::Struct(floats.clone()), true)),
527                ),
528                false,
529            ),
530        ]);
531
532        let floats_a = DataType::Struct(vec![floats[0].clone()].into());
533
534        let r = fields.filter_leaves(|idx, _| idx == 0 || idx == 1);
535        assert_eq!(r.len(), 2);
536        assert_eq!(r[0], fields[0]);
537        assert_eq!(r[1].data_type(), &floats_a);
538
539        let r = fields.filter_leaves(|_, f| f.name() == "a");
540        assert_eq!(r.len(), 5);
541        assert_eq!(r[0], fields[0]);
542        assert_eq!(r[1].data_type(), &floats_a);
543        assert_eq!(
544            r[2].data_type(),
545            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(floats_a.clone()))
546        );
547        assert_eq!(
548            r[3].as_ref(),
549            &Field::new_list("e", Field::new("floats", floats_a.clone(), true), true)
550        );
551        assert_eq!(
552            r[4].as_ref(),
553            &Field::new(
554                "i",
555                DataType::RunEndEncoded(
556                    Arc::new(Field::new("run_ends", DataType::Int32, false)),
557                    Arc::new(Field::new("values", floats_a.clone(), true)),
558                ),
559                false,
560            )
561        );
562
563        let r = fields.filter_leaves(|_, f| f.name() == "floats");
564        assert_eq!(r.len(), 0);
565
566        let r = fields.filter_leaves(|idx, _| idx == 9);
567        assert_eq!(r.len(), 1);
568        assert_eq!(r[0], fields[6]);
569
570        let r = fields.filter_leaves(|idx, _| idx == 10 || idx == 11);
571        assert_eq!(r.len(), 1);
572        assert_eq!(r[0], fields[7]);
573
574        let union = DataType::Union(
575            UnionFields::new(vec![1], vec![Field::new("field1", DataType::UInt8, false)]),
576            UnionMode::Dense,
577        );
578
579        let r = fields.filter_leaves(|idx, _| idx == 12);
580        assert_eq!(r.len(), 1);
581        assert_eq!(r[0].data_type(), &union);
582
583        let r = fields.filter_leaves(|idx, _| idx == 14 || idx == 15);
584        assert_eq!(r.len(), 1);
585        assert_eq!(r[0], fields[9]);
586
587        // Propagate error
588        let r = fields.try_filter_leaves(|_, _| Err(ArrowError::SchemaError("error".to_string())));
589        assert!(r.is_err());
590    }
591}