arrow_array/
run_iterator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Idiomatic iterator for [`RunArray`](crate::RunArray)
19
20use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray};
21use arrow_buffer::ArrowNativeType;
22
23/// The [`RunArrayIter`] provides an idiomatic way to iterate over the run array.
24/// It returns Some(T) if there is a value or None if the value is null.
25///
26/// The iterator comes with a cost as it has to iterate over three arrays to determine
27/// the value to be returned. The run_ends array is used to determine the index of the value.
28/// The nulls array is used to determine if the value is null and the values array is used to
29/// get the value.
30///
31/// Unlike other iterators in this crate, [`RunArrayIter`] does not use [`ArrayAccessor`]
32/// because the run array accessor does binary search to access each value which is too slow.
33/// The run array iterator can determine the next value in constant time.
34///
35#[derive(Debug)]
36pub struct RunArrayIter<'a, R, V>
37where
38    R: RunEndIndexType,
39    V: Sync + Send,
40    &'a V: ArrayAccessor,
41    <&'a V as ArrayAccessor>::Item: Default,
42{
43    array: TypedRunArray<'a, R, V>,
44    current_front_logical: usize,
45    current_front_physical: usize,
46    current_back_logical: usize,
47    current_back_physical: usize,
48}
49
50impl<'a, R, V> RunArrayIter<'a, R, V>
51where
52    R: RunEndIndexType,
53    V: Sync + Send,
54    &'a V: ArrayAccessor,
55    <&'a V as ArrayAccessor>::Item: Default,
56{
57    /// create a new iterator
58    pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
59        let current_front_physical = array.run_array().get_start_physical_index();
60        let current_back_physical = array.run_array().get_end_physical_index() + 1;
61        RunArrayIter {
62            array,
63            current_front_logical: array.offset(),
64            current_front_physical,
65            current_back_logical: array.offset() + array.len(),
66            current_back_physical,
67        }
68    }
69}
70
71impl<'a, R, V> Iterator for RunArrayIter<'a, R, V>
72where
73    R: RunEndIndexType,
74    V: Sync + Send,
75    &'a V: ArrayAccessor,
76    <&'a V as ArrayAccessor>::Item: Default,
77{
78    type Item = Option<<&'a V as ArrayAccessor>::Item>;
79
80    #[inline]
81    fn next(&mut self) -> Option<Self::Item> {
82        if self.current_front_logical == self.current_back_logical {
83            return None;
84        }
85
86        // If current logical index is greater than current run end index then increment
87        // the physical index.
88        let run_ends = self.array.run_ends().values();
89        if self.current_front_logical >= run_ends[self.current_front_physical].as_usize() {
90            // As the run_ends is expected to be strictly increasing, there
91            // should be at least one logical entry in one physical entry. Because of this
92            // reason the next value can be accessed by incrementing physical index once.
93            self.current_front_physical += 1;
94        }
95        if self.array.values().is_null(self.current_front_physical) {
96            self.current_front_logical += 1;
97            Some(None)
98        } else {
99            self.current_front_logical += 1;
100            // Safety:
101            // The self.current_physical is kept within bounds of self.current_logical.
102            // The self.current_logical will not go out of bounds because of the check
103            // `self.current_logical = self.current_end_logical` above.
104            unsafe {
105                Some(Some(
106                    self.array
107                        .values()
108                        .value_unchecked(self.current_front_physical),
109                ))
110            }
111        }
112    }
113
114    fn size_hint(&self) -> (usize, Option<usize>) {
115        (
116            self.current_back_logical - self.current_front_logical,
117            Some(self.current_back_logical - self.current_front_logical),
118        )
119    }
120}
121
122impl<'a, R, V> DoubleEndedIterator for RunArrayIter<'a, R, V>
123where
124    R: RunEndIndexType,
125    V: Sync + Send,
126    &'a V: ArrayAccessor,
127    <&'a V as ArrayAccessor>::Item: Default,
128{
129    fn next_back(&mut self) -> Option<Self::Item> {
130        if self.current_back_logical == self.current_front_logical {
131            return None;
132        }
133
134        self.current_back_logical -= 1;
135
136        let run_ends = self.array.run_ends().values();
137        if self.current_back_physical > 0
138            && self.current_back_logical < run_ends[self.current_back_physical - 1].as_usize()
139        {
140            // As the run_ends is expected to be strictly increasing, there
141            // should be at least one logical entry in one physical entry. Because of this
142            // reason the next value can be accessed by decrementing physical index once.
143            self.current_back_physical -= 1;
144        }
145        Some(if self.array.values().is_null(self.current_back_physical) {
146            None
147        } else {
148            // Safety:
149            // The check `self.current_end_physical > 0` ensures the value will not underflow.
150            // Also self.current_end_physical starts with array.len() and
151            // decrements based on the bounds of self.current_end_logical.
152            unsafe {
153                Some(
154                    self.array
155                        .values()
156                        .value_unchecked(self.current_back_physical),
157                )
158            }
159        })
160    }
161}
162
163/// all arrays have known size.
164impl<'a, R, V> ExactSizeIterator for RunArrayIter<'a, R, V>
165where
166    R: RunEndIndexType,
167    V: Sync + Send,
168    &'a V: ArrayAccessor,
169    <&'a V as ArrayAccessor>::Item: Default,
170{
171}
172
173#[cfg(test)]
174mod tests {
175    use rand::{seq::SliceRandom, thread_rng, Rng};
176
177    use crate::{
178        array::{Int32Array, StringArray},
179        builder::PrimitiveRunBuilder,
180        types::{Int16Type, Int32Type},
181        Array, Int64RunArray, PrimitiveArray, RunArray,
182    };
183
184    fn build_input_array(size: usize) -> Vec<Option<i32>> {
185        // The input array is created by shuffling and repeating
186        // the seed values random number of times.
187        let mut seed: Vec<Option<i32>> = vec![
188            None,
189            None,
190            None,
191            Some(1),
192            Some(2),
193            Some(3),
194            Some(4),
195            Some(5),
196            Some(6),
197            Some(7),
198            Some(8),
199            Some(9),
200        ];
201        let mut result: Vec<Option<i32>> = Vec::with_capacity(size);
202        let mut ix = 0;
203        let mut rng = thread_rng();
204        // run length can go up to 8. Cap the max run length for smaller arrays to size / 2.
205        let max_run_length = 8_usize.min(1_usize.max(size / 2));
206        while result.len() < size {
207            // shuffle the seed array if all the values are iterated.
208            if ix == 0 {
209                seed.shuffle(&mut rng);
210            }
211            // repeat the items between 1 and 8 times. Cap the length for smaller sized arrays
212            let num = max_run_length.min(rand::thread_rng().gen_range(1..=max_run_length));
213            for _ in 0..num {
214                result.push(seed[ix]);
215            }
216            ix += 1;
217            if ix == seed.len() {
218                ix = 0
219            }
220        }
221        result.resize(size, None);
222        result
223    }
224
225    #[test]
226    fn test_primitive_array_iter_round_trip() {
227        let mut input_vec = vec![
228            Some(32),
229            Some(32),
230            None,
231            Some(64),
232            Some(64),
233            Some(64),
234            Some(72),
235        ];
236        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
237        builder.extend(input_vec.iter().copied());
238        let ree_array = builder.finish();
239        let ree_array = ree_array.downcast::<Int32Array>().unwrap();
240
241        let output_vec: Vec<Option<i32>> = ree_array.into_iter().collect();
242        assert_eq!(input_vec, output_vec);
243
244        let rev_output_vec: Vec<Option<i32>> = ree_array.into_iter().rev().collect();
245        input_vec.reverse();
246        assert_eq!(input_vec, rev_output_vec);
247    }
248
249    #[test]
250    fn test_double_ended() {
251        let input_vec = vec![
252            Some(32),
253            Some(32),
254            None,
255            Some(64),
256            Some(64),
257            Some(64),
258            Some(72),
259        ];
260        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
261        builder.extend(input_vec);
262        let ree_array = builder.finish();
263        let ree_array = ree_array.downcast::<Int32Array>().unwrap();
264
265        let mut iter = ree_array.into_iter();
266        assert_eq!(Some(Some(32)), iter.next());
267        assert_eq!(Some(Some(72)), iter.next_back());
268        assert_eq!(Some(Some(32)), iter.next());
269        assert_eq!(Some(Some(64)), iter.next_back());
270        assert_eq!(Some(None), iter.next());
271        assert_eq!(Some(Some(64)), iter.next_back());
272        assert_eq!(Some(Some(64)), iter.next());
273        assert_eq!(None, iter.next_back());
274        assert_eq!(None, iter.next());
275    }
276
277    #[test]
278    fn test_run_iterator_comprehensive() {
279        // Test forward and backward iterator for different array lengths.
280        let logical_lengths = vec![1_usize, 2, 3, 4, 15, 16, 17, 63, 64, 65];
281
282        for logical_len in logical_lengths {
283            let input_array = build_input_array(logical_len);
284
285            let mut run_array_builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
286            run_array_builder.extend(input_array.iter().copied());
287            let run_array = run_array_builder.finish();
288            let typed_array = run_array.downcast::<Int32Array>().unwrap();
289
290            // test forward iterator
291            let mut input_iter = input_array.iter().copied();
292            let mut run_array_iter = typed_array.into_iter();
293            for _ in 0..logical_len {
294                assert_eq!(input_iter.next(), run_array_iter.next());
295            }
296            assert_eq!(None, run_array_iter.next());
297
298            // test reverse iterator
299            let mut input_iter = input_array.iter().rev().copied();
300            let mut run_array_iter = typed_array.into_iter().rev();
301            for _ in 0..logical_len {
302                assert_eq!(input_iter.next(), run_array_iter.next());
303            }
304            assert_eq!(None, run_array_iter.next());
305        }
306    }
307
308    #[test]
309    fn test_string_array_iter_round_trip() {
310        let input_vec = vec!["ab", "ab", "ba", "cc", "cc"];
311        let input_ree_array: Int64RunArray = input_vec.into_iter().collect();
312        let string_ree_array = input_ree_array.downcast::<StringArray>().unwrap();
313
314        // to and from iter, with a +1
315        let result: Vec<Option<String>> = string_ree_array
316            .into_iter()
317            .map(|e| {
318                e.map(|e| {
319                    let mut a = e.to_string();
320                    a.push('b');
321                    a
322                })
323            })
324            .collect();
325
326        let result_asref: Vec<Option<&str>> = result.iter().map(|f| f.as_deref()).collect();
327
328        let expected_vec = vec![
329            Some("abb"),
330            Some("abb"),
331            Some("bab"),
332            Some("ccb"),
333            Some("ccb"),
334        ];
335
336        assert_eq!(expected_vec, result_asref);
337    }
338
339    #[test]
340    #[cfg_attr(miri, ignore)] // Takes too long
341    fn test_sliced_run_array_iterator() {
342        let total_len = 80;
343        let input_array = build_input_array(total_len);
344
345        // Encode the input_array to run array
346        let mut builder =
347            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
348        builder.extend(input_array.iter().copied());
349        let run_array = builder.finish();
350
351        // test for all slice lengths.
352        for slice_len in 1..=total_len {
353            // test for offset = 0, slice length = slice_len
354            let sliced_run_array: RunArray<Int16Type> =
355                run_array.slice(0, slice_len).into_data().into();
356            let sliced_typed_run_array = sliced_run_array
357                .downcast::<PrimitiveArray<Int32Type>>()
358                .unwrap();
359
360            // Iterate on sliced typed run array
361            let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
362            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
363            assert_eq!(expected, actual);
364
365            // test for offset = total_len - slice_len, length = slice_len
366            let sliced_run_array: RunArray<Int16Type> = run_array
367                .slice(total_len - slice_len, slice_len)
368                .into_data()
369                .into();
370            let sliced_typed_run_array = sliced_run_array
371                .downcast::<PrimitiveArray<Int32Type>>()
372                .unwrap();
373
374            // Iterate on sliced typed run array
375            let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
376            let expected: Vec<Option<i32>> = input_array
377                .iter()
378                .skip(total_len - slice_len)
379                .copied()
380                .collect();
381            assert_eq!(expected, actual);
382        }
383    }
384}