1use std::ops::Range;
21
22use arrow_array::{Array, ArrayRef};
23use arrow_buffer::BooleanBuffer;
24use arrow_schema::ArrowError;
25
26use crate::cmp::distinct;
27
28#[derive(Debug, Clone)]
30pub struct Partitions(Option<BooleanBuffer>);
31
32impl Partitions {
33 pub fn ranges(&self) -> Vec<Range<usize>> {
38 let boundaries = match &self.0 {
39 Some(boundaries) => boundaries,
40 None => return vec![],
41 };
42
43 let mut out = vec![];
44 let mut current = 0;
45 for idx in boundaries.set_indices() {
46 let t = current;
47 current = idx + 1;
48 out.push(t..current)
49 }
50 let last = boundaries.len() + 1;
51 if current != last {
52 out.push(current..last)
53 }
54 out
55 }
56
57 pub fn len(&self) -> usize {
59 match &self.0 {
60 Some(b) => b.count_set_bits() + 1,
61 None => 0,
62 }
63 }
64
65 pub fn is_empty(&self) -> bool {
67 self.0.is_none()
68 }
69}
70
71pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
127 if columns.is_empty() {
128 return Err(ArrowError::InvalidArgumentError(
129 "Partition requires at least one column".to_string(),
130 ));
131 }
132 let num_rows = columns[0].len();
133 if columns.iter().any(|item| item.len() != num_rows) {
134 return Err(ArrowError::InvalidArgumentError(
135 "Partition columns have different row counts".to_string(),
136 ));
137 };
138
139 match num_rows {
140 0 => return Ok(Partitions(None)),
141 1 => return Ok(Partitions(Some(BooleanBuffer::new_unset(0)))),
142 _ => {}
143 }
144
145 let acc = find_boundaries(&columns[0])?;
146 let acc = columns
147 .iter()
148 .skip(1)
149 .try_fold(acc, |acc, c| find_boundaries(c.as_ref()).map(|b| &acc | &b))?;
150
151 Ok(Partitions(Some(acc)))
152}
153
154fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
156 let slice_len = v.len() - 1;
157 let v1 = v.slice(0, slice_len);
158 let v2 = v.slice(1, slice_len);
159 Ok(distinct(&v1, &v2)?.values().clone())
160}
161
162#[cfg(test)]
163mod tests {
164 use std::sync::Arc;
165
166 use arrow_array::*;
167 use arrow_schema::DataType;
168
169 use super::*;
170
171 #[test]
172 fn test_partition_empty() {
173 let err = partition(&[]).unwrap_err();
174 assert_eq!(
175 err.to_string(),
176 "Invalid argument error: Partition requires at least one column"
177 );
178 }
179
180 #[test]
181 fn test_partition_unaligned_rows() {
182 let input = vec![
183 Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
184 Arc::new(StringArray::from(vec![Some("foo")])) as _,
185 ];
186 let err = partition(&input).unwrap_err();
187 assert_eq!(
188 err.to_string(),
189 "Invalid argument error: Partition columns have different row counts"
190 )
191 }
192
193 #[test]
194 fn test_partition_small() {
195 let results = partition(&[
196 Arc::new(Int32Array::new(vec![].into(), None)) as _,
197 Arc::new(Int32Array::new(vec![].into(), None)) as _,
198 Arc::new(Int32Array::new(vec![].into(), None)) as _,
199 ])
200 .unwrap();
201 assert_eq!(results.len(), 0);
202 assert!(results.is_empty());
203
204 let results = partition(&[
205 Arc::new(Int32Array::from(vec![1])) as _,
206 Arc::new(Int32Array::from(vec![1])) as _,
207 ])
208 .unwrap()
209 .ranges();
210 assert_eq!(results.len(), 1);
211 assert_eq!(results[0], 0..1);
212 }
213
214 #[test]
215 fn test_partition_single_column() {
216 let a = Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]);
217 let input = vec![Arc::new(a) as _];
218 assert_eq!(
219 partition(&input).unwrap().ranges(),
220 vec![(0..1), (1..8), (8..9)],
221 );
222 }
223
224 #[test]
225 fn test_partition_all_equal_values() {
226 let a = Int64Array::from_value(1, 1000);
227 let input = vec![Arc::new(a) as _];
228 assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
229 }
230
231 #[test]
232 fn test_partition_all_null_values() {
233 let input = vec![
234 new_null_array(&DataType::Int8, 1000),
235 new_null_array(&DataType::UInt16, 1000),
236 ];
237 assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
238 }
239
240 #[test]
241 fn test_partition_unique_column_1() {
242 let input = vec![
243 Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
244 Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as _,
245 ];
246 assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1), (1..2)],);
247 }
248
249 #[test]
250 fn test_partition_unique_column_2() {
251 let input = vec![
252 Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) as _,
253 Arc::new(StringArray::from(vec![
254 Some("foo"),
255 Some("bar"),
256 Some("apple"),
257 ])) as _,
258 ];
259 assert_eq!(
260 partition(&input).unwrap().ranges(),
261 vec![(0..1), (1..2), (2..3),],
262 );
263 }
264
265 #[test]
266 fn test_partition_non_unique_column_1() {
267 let input = vec![
268 Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1), Some(1)])) as _,
269 Arc::new(StringArray::from(vec![
270 Some("foo"),
271 Some("bar"),
272 Some("bar"),
273 Some("bar"),
274 ])) as _,
275 ];
276 assert_eq!(
277 partition(&input).unwrap().ranges(),
278 vec![(0..1), (1..3), (3..4),],
279 );
280 }
281
282 #[test]
283 fn test_partition_masked_nulls() {
284 let input = vec![
285 Arc::new(Int64Array::new(vec![1; 9].into(), None)) as _,
286 Arc::new(Int64Array::new(
287 vec![1, 1, 2, 2, 2, 3, 3, 3, 3].into(),
288 Some(vec![false, true, true, true, true, false, false, true, false].into()),
289 )) as _,
290 Arc::new(Int64Array::new(
291 vec![1, 1, 2, 2, 2, 2, 2, 3, 7].into(),
292 Some(vec![true, true, true, true, false, true, true, true, false].into()),
293 )) as _,
294 ];
295
296 assert_eq!(
297 partition(&input).unwrap().ranges(),
298 vec![(0..1), (1..2), (2..4), (4..5), (5..7), (7..8), (8..9)],
299 );
300 }
301}