arrow_buffer/buffer/run.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::buffer::ScalarBuffer;
19use crate::ArrowNativeType;
20
21/// A slice-able buffer of monotonically increasing, positive integers used to store run-ends
22///
23/// # Logical vs Physical
24///
25/// A [`RunEndBuffer`] is used to encode runs of the same value, the index of each run is
26/// called the physical index. The logical index is then the corresponding index in the logical
27/// run-encoded array, i.e. a single run of length `3`, would have the logical indices `0..3`.
28///
29/// Each value in [`RunEndBuffer::values`] is the cumulative length of all runs in the
30/// logical array, up to that physical index.
31///
32/// Consider a [`RunEndBuffer`] containing `[3, 4, 6]`. The maximum physical index is `2`,
33/// as there are `3` values, and the maximum logical index is `5`, as the maximum run end
34/// is `6`. The physical indices are therefore `[0, 0, 0, 1, 2, 2]`
35///
36/// ```text
37/// ┌─────────┐ ┌─────────┐ ┌─────────┐
38/// │ 3 │ │ 0 │ ─┬──────▶ │ 0 │
39/// ├─────────┤ ├─────────┤ │ ├─────────┤
40/// │ 4 │ │ 1 │ ─┤ ┌────▶ │ 1 │
41/// ├─────────┤ ├─────────┤ │ │ ├─────────┤
42/// │ 6 │ │ 2 │ ─┘ │ ┌──▶ │ 2 │
43/// └─────────┘ ├─────────┤ │ │ └─────────┘
44/// run ends │ 3 │ ───┘ │ physical indices
45/// ├─────────┤ │
46/// │ 4 │ ─────┤
47/// ├─────────┤ │
48/// │ 5 │ ─────┘
49/// └─────────┘
50/// logical indices
51/// ```
52///
53/// # Slicing
54///
55/// In order to provide zero-copy slicing, this container stores a separate offset and length
56///
57/// For example, a [`RunEndBuffer`] containing values `[3, 6, 8]` with offset and length `4` would
58/// describe the physical indices `1, 1, 2, 2`
59///
60/// For example, a [`RunEndBuffer`] containing values `[6, 8, 9]` with offset `2` and length `5`
61/// would describe the physical indices `0, 0, 0, 0, 1`
62///
63/// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
64#[derive(Debug, Clone)]
65pub struct RunEndBuffer<E: ArrowNativeType> {
66 run_ends: ScalarBuffer<E>,
67 len: usize,
68 offset: usize,
69}
70
71impl<E> RunEndBuffer<E>
72where
73 E: ArrowNativeType,
74{
75 /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], an `offset` and `len`
76 ///
77 /// # Panics
78 ///
79 /// - `buffer` does not contain strictly increasing values greater than zero
80 /// - the last value of `buffer` is less than `offset + len`
81 pub fn new(run_ends: ScalarBuffer<E>, offset: usize, len: usize) -> Self {
82 assert!(
83 run_ends.windows(2).all(|w| w[0] < w[1]),
84 "run-ends not strictly increasing"
85 );
86
87 if len != 0 {
88 assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends");
89 let end = E::from_usize(offset.saturating_add(len)).unwrap();
90 assert!(
91 *run_ends.first().unwrap() > E::usize_as(0),
92 "run-ends not greater than 0"
93 );
94 assert!(
95 *run_ends.last().unwrap() >= end,
96 "slice beyond bounds of run-ends"
97 );
98 }
99
100 Self {
101 run_ends,
102 offset,
103 len,
104 }
105 }
106
107 /// Create a new [`RunEndBuffer`] from an [`ScalarBuffer`], an `offset` and `len`
108 ///
109 /// # Safety
110 ///
111 /// - `buffer` must contain strictly increasing values greater than zero
112 /// - The last value of `buffer` must be greater than or equal to `offset + len`
113 pub unsafe fn new_unchecked(run_ends: ScalarBuffer<E>, offset: usize, len: usize) -> Self {
114 Self {
115 run_ends,
116 offset,
117 len,
118 }
119 }
120
121 /// Returns the logical offset into the run-ends stored by this buffer
122 #[inline]
123 pub fn offset(&self) -> usize {
124 self.offset
125 }
126
127 /// Returns the logical length of the run-ends stored by this buffer
128 #[inline]
129 pub fn len(&self) -> usize {
130 self.len
131 }
132
133 /// Returns true if this buffer is empty
134 #[inline]
135 pub fn is_empty(&self) -> bool {
136 self.len == 0
137 }
138
139 /// Free up unused memory.
140 pub fn shrink_to_fit(&mut self) {
141 // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer
142 self.run_ends.shrink_to_fit();
143 }
144
145 /// Returns the values of this [`RunEndBuffer`] not including any offset
146 #[inline]
147 pub fn values(&self) -> &[E] {
148 &self.run_ends
149 }
150
151 /// Returns the maximum run-end encoded in the underlying buffer
152 #[inline]
153 pub fn max_value(&self) -> usize {
154 self.values().last().copied().unwrap_or_default().as_usize()
155 }
156
157 /// Performs a binary search to find the physical index for the given logical index
158 ///
159 /// The result is arbitrary if `logical_index >= self.len()`
160 pub fn get_physical_index(&self, logical_index: usize) -> usize {
161 let logical_index = E::usize_as(self.offset + logical_index);
162 let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
163
164 match self.run_ends.binary_search_by(cmp) {
165 Ok(idx) => idx + 1,
166 Err(idx) => idx,
167 }
168 }
169
170 /// Returns the physical index at which the logical array starts
171 pub fn get_start_physical_index(&self) -> usize {
172 if self.offset == 0 || self.len == 0 {
173 return 0;
174 }
175 // Fallback to binary search
176 self.get_physical_index(0)
177 }
178
179 /// Returns the physical index at which the logical array ends
180 pub fn get_end_physical_index(&self) -> usize {
181 if self.len == 0 {
182 return 0;
183 }
184 if self.max_value() == self.offset + self.len {
185 return self.values().len() - 1;
186 }
187 // Fallback to binary search
188 self.get_physical_index(self.len - 1)
189 }
190
191 /// Slices this [`RunEndBuffer`] by the provided `offset` and `length`
192 pub fn slice(&self, offset: usize, len: usize) -> Self {
193 assert!(
194 offset.saturating_add(len) <= self.len,
195 "the length + offset of the sliced RunEndBuffer cannot exceed the existing length"
196 );
197 Self {
198 run_ends: self.run_ends.clone(),
199 offset: self.offset + offset,
200 len,
201 }
202 }
203
204 /// Returns the inner [`ScalarBuffer`]
205 pub fn inner(&self) -> &ScalarBuffer<E> {
206 &self.run_ends
207 }
208
209 /// Returns the inner [`ScalarBuffer`], consuming self
210 pub fn into_inner(self) -> ScalarBuffer<E> {
211 self.run_ends
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use crate::buffer::RunEndBuffer;
218
219 #[test]
220 fn test_zero_length_slice() {
221 let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4);
222 assert_eq!(buffer.get_start_physical_index(), 0);
223 assert_eq!(buffer.get_end_physical_index(), 1);
224 assert_eq!(buffer.get_physical_index(3), 1);
225
226 for offset in 0..4 {
227 let sliced = buffer.slice(offset, 0);
228 assert_eq!(sliced.get_start_physical_index(), 0);
229 assert_eq!(sliced.get_end_physical_index(), 0);
230 }
231
232 let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0);
233 assert_eq!(buffer.get_start_physical_index(), 0);
234 assert_eq!(buffer.get_end_physical_index(), 0);
235 }
236}