1use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray};
21use arrow_buffer::ArrowNativeType;
22
23#[derive(Debug)]
36pub struct RunArrayIter<'a, R, V>
37where
38 R: RunEndIndexType,
39 V: Sync + Send,
40 &'a V: ArrayAccessor,
41 <&'a V as ArrayAccessor>::Item: Default,
42{
43 array: TypedRunArray<'a, R, V>,
44 current_front_logical: usize,
45 current_front_physical: usize,
46 current_back_logical: usize,
47 current_back_physical: usize,
48}
49
50impl<'a, R, V> RunArrayIter<'a, R, V>
51where
52 R: RunEndIndexType,
53 V: Sync + Send,
54 &'a V: ArrayAccessor,
55 <&'a V as ArrayAccessor>::Item: Default,
56{
57 pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
59 let current_front_physical = array.run_array().get_start_physical_index();
60 let current_back_physical = array.run_array().get_end_physical_index() + 1;
61 RunArrayIter {
62 array,
63 current_front_logical: array.offset(),
64 current_front_physical,
65 current_back_logical: array.offset() + array.len(),
66 current_back_physical,
67 }
68 }
69}
70
71impl<'a, R, V> Iterator for RunArrayIter<'a, R, V>
72where
73 R: RunEndIndexType,
74 V: Sync + Send,
75 &'a V: ArrayAccessor,
76 <&'a V as ArrayAccessor>::Item: Default,
77{
78 type Item = Option<<&'a V as ArrayAccessor>::Item>;
79
80 #[inline]
81 fn next(&mut self) -> Option<Self::Item> {
82 if self.current_front_logical == self.current_back_logical {
83 return None;
84 }
85
86 let run_ends = self.array.run_ends().values();
89 if self.current_front_logical >= run_ends[self.current_front_physical].as_usize() {
90 self.current_front_physical += 1;
94 }
95 if self.array.values().is_null(self.current_front_physical) {
96 self.current_front_logical += 1;
97 Some(None)
98 } else {
99 self.current_front_logical += 1;
100 unsafe {
105 Some(Some(
106 self.array
107 .values()
108 .value_unchecked(self.current_front_physical),
109 ))
110 }
111 }
112 }
113
114 fn size_hint(&self) -> (usize, Option<usize>) {
115 (
116 self.current_back_logical - self.current_front_logical,
117 Some(self.current_back_logical - self.current_front_logical),
118 )
119 }
120}
121
122impl<'a, R, V> DoubleEndedIterator for RunArrayIter<'a, R, V>
123where
124 R: RunEndIndexType,
125 V: Sync + Send,
126 &'a V: ArrayAccessor,
127 <&'a V as ArrayAccessor>::Item: Default,
128{
129 fn next_back(&mut self) -> Option<Self::Item> {
130 if self.current_back_logical == self.current_front_logical {
131 return None;
132 }
133
134 self.current_back_logical -= 1;
135
136 let run_ends = self.array.run_ends().values();
137 if self.current_back_physical > 0
138 && self.current_back_logical < run_ends[self.current_back_physical - 1].as_usize()
139 {
140 self.current_back_physical -= 1;
144 }
145 Some(if self.array.values().is_null(self.current_back_physical) {
146 None
147 } else {
148 unsafe {
153 Some(
154 self.array
155 .values()
156 .value_unchecked(self.current_back_physical),
157 )
158 }
159 })
160 }
161}
162
163impl<'a, R, V> ExactSizeIterator for RunArrayIter<'a, R, V>
165where
166 R: RunEndIndexType,
167 V: Sync + Send,
168 &'a V: ArrayAccessor,
169 <&'a V as ArrayAccessor>::Item: Default,
170{
171}
172
173#[cfg(test)]
174mod tests {
175 use rand::{seq::SliceRandom, thread_rng, Rng};
176
177 use crate::{
178 array::{Int32Array, StringArray},
179 builder::PrimitiveRunBuilder,
180 types::{Int16Type, Int32Type},
181 Array, Int64RunArray, PrimitiveArray, RunArray,
182 };
183
184 fn build_input_array(size: usize) -> Vec<Option<i32>> {
185 let mut seed: Vec<Option<i32>> = vec![
188 None,
189 None,
190 None,
191 Some(1),
192 Some(2),
193 Some(3),
194 Some(4),
195 Some(5),
196 Some(6),
197 Some(7),
198 Some(8),
199 Some(9),
200 ];
201 let mut result: Vec<Option<i32>> = Vec::with_capacity(size);
202 let mut ix = 0;
203 let mut rng = thread_rng();
204 let max_run_length = 8_usize.min(1_usize.max(size / 2));
206 while result.len() < size {
207 if ix == 0 {
209 seed.shuffle(&mut rng);
210 }
211 let num = max_run_length.min(rand::thread_rng().gen_range(1..=max_run_length));
213 for _ in 0..num {
214 result.push(seed[ix]);
215 }
216 ix += 1;
217 if ix == seed.len() {
218 ix = 0
219 }
220 }
221 result.resize(size, None);
222 result
223 }
224
225 #[test]
226 fn test_primitive_array_iter_round_trip() {
227 let mut input_vec = vec![
228 Some(32),
229 Some(32),
230 None,
231 Some(64),
232 Some(64),
233 Some(64),
234 Some(72),
235 ];
236 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
237 builder.extend(input_vec.iter().copied());
238 let ree_array = builder.finish();
239 let ree_array = ree_array.downcast::<Int32Array>().unwrap();
240
241 let output_vec: Vec<Option<i32>> = ree_array.into_iter().collect();
242 assert_eq!(input_vec, output_vec);
243
244 let rev_output_vec: Vec<Option<i32>> = ree_array.into_iter().rev().collect();
245 input_vec.reverse();
246 assert_eq!(input_vec, rev_output_vec);
247 }
248
249 #[test]
250 fn test_double_ended() {
251 let input_vec = vec![
252 Some(32),
253 Some(32),
254 None,
255 Some(64),
256 Some(64),
257 Some(64),
258 Some(72),
259 ];
260 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
261 builder.extend(input_vec);
262 let ree_array = builder.finish();
263 let ree_array = ree_array.downcast::<Int32Array>().unwrap();
264
265 let mut iter = ree_array.into_iter();
266 assert_eq!(Some(Some(32)), iter.next());
267 assert_eq!(Some(Some(72)), iter.next_back());
268 assert_eq!(Some(Some(32)), iter.next());
269 assert_eq!(Some(Some(64)), iter.next_back());
270 assert_eq!(Some(None), iter.next());
271 assert_eq!(Some(Some(64)), iter.next_back());
272 assert_eq!(Some(Some(64)), iter.next());
273 assert_eq!(None, iter.next_back());
274 assert_eq!(None, iter.next());
275 }
276
277 #[test]
278 fn test_run_iterator_comprehensive() {
279 let logical_lengths = vec![1_usize, 2, 3, 4, 15, 16, 17, 63, 64, 65];
281
282 for logical_len in logical_lengths {
283 let input_array = build_input_array(logical_len);
284
285 let mut run_array_builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
286 run_array_builder.extend(input_array.iter().copied());
287 let run_array = run_array_builder.finish();
288 let typed_array = run_array.downcast::<Int32Array>().unwrap();
289
290 let mut input_iter = input_array.iter().copied();
292 let mut run_array_iter = typed_array.into_iter();
293 for _ in 0..logical_len {
294 assert_eq!(input_iter.next(), run_array_iter.next());
295 }
296 assert_eq!(None, run_array_iter.next());
297
298 let mut input_iter = input_array.iter().rev().copied();
300 let mut run_array_iter = typed_array.into_iter().rev();
301 for _ in 0..logical_len {
302 assert_eq!(input_iter.next(), run_array_iter.next());
303 }
304 assert_eq!(None, run_array_iter.next());
305 }
306 }
307
308 #[test]
309 fn test_string_array_iter_round_trip() {
310 let input_vec = vec!["ab", "ab", "ba", "cc", "cc"];
311 let input_ree_array: Int64RunArray = input_vec.into_iter().collect();
312 let string_ree_array = input_ree_array.downcast::<StringArray>().unwrap();
313
314 let result: Vec<Option<String>> = string_ree_array
316 .into_iter()
317 .map(|e| {
318 e.map(|e| {
319 let mut a = e.to_string();
320 a.push('b');
321 a
322 })
323 })
324 .collect();
325
326 let result_asref: Vec<Option<&str>> = result.iter().map(|f| f.as_deref()).collect();
327
328 let expected_vec = vec![
329 Some("abb"),
330 Some("abb"),
331 Some("bab"),
332 Some("ccb"),
333 Some("ccb"),
334 ];
335
336 assert_eq!(expected_vec, result_asref);
337 }
338
339 #[test]
340 #[cfg_attr(miri, ignore)] fn test_sliced_run_array_iterator() {
342 let total_len = 80;
343 let input_array = build_input_array(total_len);
344
345 let mut builder =
347 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
348 builder.extend(input_array.iter().copied());
349 let run_array = builder.finish();
350
351 for slice_len in 1..=total_len {
353 let sliced_run_array: RunArray<Int16Type> =
355 run_array.slice(0, slice_len).into_data().into();
356 let sliced_typed_run_array = sliced_run_array
357 .downcast::<PrimitiveArray<Int32Type>>()
358 .unwrap();
359
360 let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
362 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
363 assert_eq!(expected, actual);
364
365 let sliced_run_array: RunArray<Int16Type> = run_array
367 .slice(total_len - slice_len, slice_len)
368 .into_data()
369 .into();
370 let sliced_typed_run_array = sliced_run_array
371 .downcast::<PrimitiveArray<Int32Type>>()
372 .unwrap();
373
374 let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
376 let expected: Vec<Option<i32>> = input_array
377 .iter()
378 .skip(total_len - slice_len)
379 .copied()
380 .collect();
381 assert_eq!(expected, actual);
382 }
383 }
384}