1use std::sync::Arc;
5
6use arrow_array::{Array, BooleanArray, GenericListArray, OffsetSizeTrait};
7use arrow_buffer::{BooleanBufferBuilder, OffsetBuffer, ScalarBuffer};
8use arrow_schema::Field;
9
10pub trait ListArrayExt {
11 fn filter_garbage_nulls(&self) -> Self;
19 fn trimmed_values(&self) -> Arc<dyn Array>;
26}
27
28impl<OffsetSize: OffsetSizeTrait> ListArrayExt for GenericListArray<OffsetSize> {
29 fn filter_garbage_nulls(&self) -> Self {
30 if self.is_empty() {
31 return self.clone();
32 }
33 let Some(validity) = self.nulls().cloned() else {
34 return self.clone();
35 };
36
37 let mut should_keep = BooleanBufferBuilder::new(self.values().len());
38
39 let preamble_len = self.offsets().first().unwrap().to_usize().unwrap();
41 should_keep.append_n(preamble_len, false);
42
43 let mut new_offsets: Vec<OffsetSize> = Vec::with_capacity(self.len() + 1);
44 new_offsets.push(OffsetSize::zero());
45 let mut cur_len = OffsetSize::zero();
46 for (offset, is_valid) in self.offsets().windows(2).zip(validity.iter()) {
47 let len = offset[1] - offset[0];
48 if is_valid {
49 cur_len += len;
50 should_keep.append_n(len.to_usize().unwrap(), true);
51 new_offsets.push(cur_len);
52 } else {
53 should_keep.append_n(len.to_usize().unwrap(), false);
54 new_offsets.push(cur_len);
55 }
56 }
57
58 let trailer = self.values().len() - should_keep.len();
60 should_keep.append_n(trailer, false);
61
62 let should_keep = should_keep.finish();
63 let should_keep = BooleanArray::new(should_keep, None);
64 let new_values = arrow_select::filter::filter(self.values(), &should_keep).unwrap();
65 let new_offsets = ScalarBuffer::from(new_offsets);
66 let new_offsets = OffsetBuffer::new(new_offsets);
67
68 Self::new(
69 Arc::new(Field::new(
70 "item",
71 self.value_type(),
72 self.values().is_nullable(),
73 )),
74 new_offsets,
75 new_values,
76 Some(validity),
77 )
78 }
79
80 fn trimmed_values(&self) -> Arc<dyn Array> {
81 let first_value = self
82 .offsets()
83 .first()
84 .map(|v| v.to_usize().unwrap())
85 .unwrap_or(0);
86 let last_value = self
87 .offsets()
88 .last()
89 .map(|v| v.to_usize().unwrap())
90 .unwrap_or(0);
91 self.values().slice(first_value, last_value - first_value)
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use std::sync::Arc;
98
99 use arrow_array::{ListArray, UInt64Array};
100 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
101 use arrow_schema::{DataType, Field};
102
103 use super::ListArrayExt;
104
105 #[test]
106 fn test_filter_garbage_nulls() {
107 let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
108 let offsets = ScalarBuffer::<i32>::from(vec![2, 5, 8, 9]);
109 let offsets = OffsetBuffer::new(offsets);
110 let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
111 let list_arr = ListArray::new(
112 Arc::new(Field::new("item", DataType::UInt64, true)),
113 offsets,
114 Arc::new(items),
115 Some(list_validity.clone()),
116 );
117
118 let filtered = list_arr.filter_garbage_nulls();
119
120 let expected_items = UInt64Array::from(vec![2, 3, 4, 8]);
121 let offsets = ScalarBuffer::<i32>::from(vec![0, 3, 3, 4]);
122 let expected = ListArray::new(
123 Arc::new(Field::new("item", DataType::UInt64, false)),
124 OffsetBuffer::new(offsets),
125 Arc::new(expected_items),
126 Some(list_validity),
127 );
128
129 assert_eq!(filtered, expected);
130 }
131
132 #[test]
133 fn test_trim_values() {
134 let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
135 let offsets = ScalarBuffer::<i32>::from(vec![2, 5, 6, 8, 9]);
136 let offsets = OffsetBuffer::new(offsets);
137 let list_arr = ListArray::new(
138 Arc::new(Field::new("item", DataType::UInt64, true)),
139 offsets,
140 Arc::new(items),
141 None,
142 );
143 let list_arr = list_arr.slice(1, 2);
144
145 let trimmed = list_arr.trimmed_values();
146
147 let expected_items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
148 let expected_items = expected_items.slice(5, 3);
149
150 assert_eq!(trimmed.as_ref(), &expected_items);
151 }
152}