arrow_array/builder/
union_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::builder::buffer_builder::{Int32BufferBuilder, Int8BufferBuilder};
19use crate::builder::BufferBuilder;
20use crate::{make_array, ArrowPrimitiveType, UnionArray};
21use arrow_buffer::NullBufferBuilder;
22use arrow_buffer::{ArrowNativeType, Buffer};
23use arrow_data::ArrayDataBuilder;
24use arrow_schema::{ArrowError, DataType, Field};
25use std::any::Any;
26use std::collections::BTreeMap;
27use std::sync::Arc;
28
29/// `FieldData` is a helper struct to track the state of the fields in the `UnionBuilder`.
30#[derive(Debug)]
31struct FieldData {
32    /// The type id for this field
33    type_id: i8,
34    /// The Arrow data type represented in the `values_buffer`, which is untyped
35    data_type: DataType,
36    /// A buffer containing the values for this field in raw bytes
37    values_buffer: Box<dyn FieldDataValues>,
38    ///  The number of array slots represented by the buffer
39    slots: usize,
40    /// A builder for the null bitmap
41    null_buffer_builder: NullBufferBuilder,
42}
43
44/// A type-erased [`BufferBuilder`] used by [`FieldData`]
45trait FieldDataValues: std::fmt::Debug {
46    fn as_mut_any(&mut self) -> &mut dyn Any;
47
48    fn append_null(&mut self);
49
50    fn finish(&mut self) -> Buffer;
51}
52
53impl<T: ArrowNativeType> FieldDataValues for BufferBuilder<T> {
54    fn as_mut_any(&mut self) -> &mut dyn Any {
55        self
56    }
57
58    fn append_null(&mut self) {
59        self.advance(1)
60    }
61
62    fn finish(&mut self) -> Buffer {
63        self.finish()
64    }
65}
66
67impl FieldData {
68    /// Creates a new `FieldData`.
69    fn new<T: ArrowPrimitiveType>(type_id: i8, data_type: DataType, capacity: usize) -> Self {
70        Self {
71            type_id,
72            data_type,
73            slots: 0,
74            values_buffer: Box::new(BufferBuilder::<T::Native>::new(capacity)),
75            null_buffer_builder: NullBufferBuilder::new(capacity),
76        }
77    }
78
79    /// Appends a single value to this `FieldData`'s `values_buffer`.
80    fn append_value<T: ArrowPrimitiveType>(&mut self, v: T::Native) {
81        self.values_buffer
82            .as_mut_any()
83            .downcast_mut::<BufferBuilder<T::Native>>()
84            .expect("Tried to append unexpected type")
85            .append(v);
86
87        self.null_buffer_builder.append(true);
88        self.slots += 1;
89    }
90
91    /// Appends a null to this `FieldData`.
92    fn append_null(&mut self) {
93        self.values_buffer.append_null();
94        self.null_buffer_builder.append(false);
95        self.slots += 1;
96    }
97}
98
99/// Builder for [`UnionArray`]
100///
101/// Example: **Dense Memory Layout**
102///
103/// ```
104/// # use arrow_array::builder::UnionBuilder;
105/// # use arrow_array::types::{Float64Type, Int32Type};
106///
107/// let mut builder = UnionBuilder::new_dense();
108/// builder.append::<Int32Type>("a", 1).unwrap();
109/// builder.append::<Float64Type>("b", 3.0).unwrap();
110/// builder.append::<Int32Type>("a", 4).unwrap();
111/// let union = builder.build().unwrap();
112///
113/// assert_eq!(union.type_id(0), 0);
114/// assert_eq!(union.type_id(1), 1);
115/// assert_eq!(union.type_id(2), 0);
116///
117/// assert_eq!(union.value_offset(0), 0);
118/// assert_eq!(union.value_offset(1), 0);
119/// assert_eq!(union.value_offset(2), 1);
120/// ```
121///
122/// Example: **Sparse Memory Layout**
123/// ```
124/// # use arrow_array::builder::UnionBuilder;
125/// # use arrow_array::types::{Float64Type, Int32Type};
126///
127/// let mut builder = UnionBuilder::new_sparse();
128/// builder.append::<Int32Type>("a", 1).unwrap();
129/// builder.append::<Float64Type>("b", 3.0).unwrap();
130/// builder.append::<Int32Type>("a", 4).unwrap();
131/// let union = builder.build().unwrap();
132///
133/// assert_eq!(union.type_id(0), 0);
134/// assert_eq!(union.type_id(1), 1);
135/// assert_eq!(union.type_id(2), 0);
136///
137/// assert_eq!(union.value_offset(0), 0);
138/// assert_eq!(union.value_offset(1), 1);
139/// assert_eq!(union.value_offset(2), 2);
140/// ```
141#[derive(Debug)]
142pub struct UnionBuilder {
143    /// The current number of slots in the array
144    len: usize,
145    /// Maps field names to `FieldData` instances which track the builders for that field
146    fields: BTreeMap<String, FieldData>,
147    /// Builder to keep track of type ids
148    type_id_builder: Int8BufferBuilder,
149    /// Builder to keep track of offsets (`None` for sparse unions)
150    value_offset_builder: Option<Int32BufferBuilder>,
151    initial_capacity: usize,
152}
153
154impl UnionBuilder {
155    /// Creates a new dense array builder.
156    pub fn new_dense() -> Self {
157        Self::with_capacity_dense(1024)
158    }
159
160    /// Creates a new sparse array builder.
161    pub fn new_sparse() -> Self {
162        Self::with_capacity_sparse(1024)
163    }
164
165    /// Creates a new dense array builder with capacity.
166    pub fn with_capacity_dense(capacity: usize) -> Self {
167        Self {
168            len: 0,
169            fields: Default::default(),
170            type_id_builder: Int8BufferBuilder::new(capacity),
171            value_offset_builder: Some(Int32BufferBuilder::new(capacity)),
172            initial_capacity: capacity,
173        }
174    }
175
176    /// Creates a new sparse array builder  with capacity.
177    pub fn with_capacity_sparse(capacity: usize) -> Self {
178        Self {
179            len: 0,
180            fields: Default::default(),
181            type_id_builder: Int8BufferBuilder::new(capacity),
182            value_offset_builder: None,
183            initial_capacity: capacity,
184        }
185    }
186
187    /// Appends a null to this builder, encoding the null in the array
188    /// of the `type_name` child / field.
189    ///
190    /// Since `UnionArray` encodes nulls as an entry in its children
191    /// (it doesn't have a validity bitmap itself), and where the null
192    /// is part of the final array, appending a NULL requires
193    /// specifying which field (child) to use.
194    #[inline]
195    pub fn append_null<T: ArrowPrimitiveType>(
196        &mut self,
197        type_name: &str,
198    ) -> Result<(), ArrowError> {
199        self.append_option::<T>(type_name, None)
200    }
201
202    /// Appends a value to this builder.
203    #[inline]
204    pub fn append<T: ArrowPrimitiveType>(
205        &mut self,
206        type_name: &str,
207        v: T::Native,
208    ) -> Result<(), ArrowError> {
209        self.append_option::<T>(type_name, Some(v))
210    }
211
212    fn append_option<T: ArrowPrimitiveType>(
213        &mut self,
214        type_name: &str,
215        v: Option<T::Native>,
216    ) -> Result<(), ArrowError> {
217        let type_name = type_name.to_string();
218
219        let mut field_data = match self.fields.remove(&type_name) {
220            Some(data) => {
221                if data.data_type != T::DATA_TYPE {
222                    return Err(ArrowError::InvalidArgumentError(format!(
223                        "Attempt to write col \"{}\" with type {} doesn't match existing type {}",
224                        type_name,
225                        T::DATA_TYPE,
226                        data.data_type
227                    )));
228                }
229                data
230            }
231            None => match self.value_offset_builder {
232                Some(_) => FieldData::new::<T>(
233                    self.fields.len() as i8,
234                    T::DATA_TYPE,
235                    self.initial_capacity,
236                ),
237                // In the case of a sparse union, we should pass the maximum of the currently length and the capacity.
238                None => {
239                    let mut fd = FieldData::new::<T>(
240                        self.fields.len() as i8,
241                        T::DATA_TYPE,
242                        self.len.max(self.initial_capacity),
243                    );
244                    for _ in 0..self.len {
245                        fd.append_null();
246                    }
247                    fd
248                }
249            },
250        };
251        self.type_id_builder.append(field_data.type_id);
252
253        match &mut self.value_offset_builder {
254            // Dense Union
255            Some(offset_builder) => {
256                offset_builder.append(field_data.slots as i32);
257            }
258            // Sparse Union
259            None => {
260                for (_, fd) in self.fields.iter_mut() {
261                    // Append to all bar the FieldData currently being appended to
262                    fd.append_null();
263                }
264            }
265        }
266
267        match v {
268            Some(v) => field_data.append_value::<T>(v),
269            None => field_data.append_null(),
270        }
271
272        self.fields.insert(type_name, field_data);
273        self.len += 1;
274        Ok(())
275    }
276
277    /// Builds this builder creating a new `UnionArray`.
278    pub fn build(self) -> Result<UnionArray, ArrowError> {
279        let mut children = Vec::with_capacity(self.fields.len());
280        let union_fields = self
281            .fields
282            .into_iter()
283            .map(
284                |(
285                    name,
286                    FieldData {
287                        type_id,
288                        data_type,
289                        mut values_buffer,
290                        slots,
291                        mut null_buffer_builder,
292                    },
293                )| {
294                    let array_ref = make_array(unsafe {
295                        ArrayDataBuilder::new(data_type.clone())
296                            .add_buffer(values_buffer.finish())
297                            .len(slots)
298                            .nulls(null_buffer_builder.finish())
299                            .build_unchecked()
300                    });
301                    children.push(array_ref);
302                    (type_id, Arc::new(Field::new(name, data_type, false)))
303                },
304            )
305            .collect();
306        UnionArray::try_new(
307            union_fields,
308            self.type_id_builder.into(),
309            self.value_offset_builder.map(Into::into),
310            children,
311        )
312    }
313}