polars_arrow/array/growable/
binview.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use std::ops::Deref;
use std::sync::Arc;

use polars_utils::aliases::{InitHashMaps, PlHashSet};
use polars_utils::itertools::Itertools;

use super::Growable;
use crate::array::binview::{BinaryViewArrayGeneric, ViewType};
use crate::array::growable::utils::{extend_validity, extend_validity_copies, prepare_validity};
use crate::array::{Array, MutableBinaryViewArray, View};
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;

/// Concrete [`Growable`] for the [`BinaryArray`].
pub struct GrowableBinaryViewArray<'a, T: ViewType + ?Sized> {
    arrays: Vec<&'a BinaryViewArrayGeneric<T>>,
    dtype: ArrowDataType,
    validity: Option<MutableBitmap>,
    inner: MutableBinaryViewArray<T>,
    same_buffers: Option<&'a Arc<[Buffer<u8>]>>,
    total_same_buffers_len: usize, // Only valid if same_buffers is Some.
    has_duplicate_buffers: bool,
}

impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
    /// Creates a new [`GrowableBinaryViewArray`] bound to `arrays` with a pre-allocated `capacity`.
    /// # Panics
    /// If `arrays` is empty.
    pub fn new(
        arrays: Vec<&'a BinaryViewArrayGeneric<T>>,
        mut use_validity: bool,
        capacity: usize,
    ) -> Self {
        let dtype = arrays[0].dtype().clone();

        // if any of the arrays has nulls, insertions from any array requires setting bits
        // as there is at least one array with nulls.
        if !use_validity & arrays.iter().any(|array| array.null_count() > 0) {
            use_validity = true;
        };

        // Fast case.
        // This happens in group-by's
        // And prevents us to push `M` buffers insert in the buffers
        // #15615
        let all_same_buffer = arrays
            .iter()
            .map(|array| array.data_buffers().as_ptr())
            .all_equal()
            && !arrays.is_empty();
        let same_buffers = all_same_buffer.then(|| arrays[0].data_buffers());
        let total_same_buffers_len = all_same_buffer
            .then(|| arrays[0].total_buffer_len())
            .unwrap_or_default();

        let mut duplicates = PlHashSet::new();
        let mut has_duplicate_buffers = false;
        for arr in arrays.iter() {
            if !duplicates.insert(arr.data_buffers().as_ptr()) {
                has_duplicate_buffers = true;
                break;
            }
        }
        Self {
            arrays,
            dtype,
            validity: prepare_validity(use_validity, capacity),
            inner: MutableBinaryViewArray::<T>::with_capacity(capacity),
            same_buffers,
            total_same_buffers_len,
            has_duplicate_buffers,
        }
    }

    fn to(&mut self) -> BinaryViewArrayGeneric<T> {
        let arr = std::mem::take(&mut self.inner);
        if let Some(buffers) = self.same_buffers {
            unsafe {
                BinaryViewArrayGeneric::<T>::new_unchecked(
                    self.dtype.clone(),
                    arr.views.into(),
                    buffers.clone(),
                    self.validity.take().map(Bitmap::from),
                    arr.total_bytes_len,
                    self.total_same_buffers_len,
                )
            }
        } else {
            arr.freeze_with_dtype(self.dtype.clone())
                .with_validity(self.validity.take().map(Bitmap::from))
        }
    }
}

impl<'a, T: ViewType + ?Sized> Growable<'a> for GrowableBinaryViewArray<'a, T> {
    unsafe fn extend(&mut self, index: usize, start: usize, len: usize) {
        let array = *self.arrays.get_unchecked(index);
        let local_buffers = array.data_buffers();

        extend_validity(&mut self.validity, array, start, len);

        let range = start..start + len;

        let views_iter = array.views().get_unchecked(range).iter().cloned();

        if self.same_buffers.is_some() {
            let mut total_len = 0;
            self.inner
                .views
                .extend(views_iter.inspect(|v| total_len += v.length as usize));
            self.inner.total_bytes_len += total_len;
        } else if self.has_duplicate_buffers {
            self.inner
                .extend_non_null_views_unchecked_dedupe(views_iter, local_buffers.deref());
        } else {
            self.inner
                .extend_non_null_views_unchecked(views_iter, local_buffers.deref());
        }
    }

    unsafe fn extend_copies(&mut self, index: usize, start: usize, len: usize, copies: usize) {
        let orig_view_start = self.inner.views.len();
        let orig_total_bytes_len = self.inner.total_bytes_len;
        if copies > 0 {
            self.extend(index, start, len);
        }
        if copies > 1 {
            let array = *self.arrays.get_unchecked(index);
            extend_validity_copies(&mut self.validity, array, start, len, copies - 1);
            let extended_view_end = self.inner.views.len();
            let total_bytes_len_end = self.inner.total_bytes_len;
            for _ in 0..copies - 1 {
                self.inner
                    .views
                    .extend_from_within(orig_view_start..extended_view_end);
                self.inner.total_bytes_len += total_bytes_len_end - orig_total_bytes_len;
            }
        }
    }

    fn extend_validity(&mut self, additional: usize) {
        self.inner
            .views
            .extend(std::iter::repeat(View::default()).take(additional));
        if let Some(validity) = &mut self.validity {
            validity.extend_constant(additional, false);
        }
    }

    #[inline]
    fn len(&self) -> usize {
        self.inner.len()
    }

    fn as_arc(&mut self) -> Arc<dyn Array> {
        self.to().arced()
    }

    fn as_box(&mut self) -> Box<dyn Array> {
        self.to().boxed()
    }
}

impl<'a, T: ViewType + ?Sized> From<GrowableBinaryViewArray<'a, T>> for BinaryViewArrayGeneric<T> {
    fn from(mut val: GrowableBinaryViewArray<'a, T>) -> Self {
        val.to()
    }
}