arrow_ord/
partition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines partition kernel for `ArrayRef`
19
20use std::ops::Range;
21
22use arrow_array::{Array, ArrayRef};
23use arrow_buffer::BooleanBuffer;
24use arrow_schema::ArrowError;
25
26use crate::cmp::distinct;
27
28/// A computed set of partitions, see [`partition`]
29#[derive(Debug, Clone)]
30pub struct Partitions(Option<BooleanBuffer>);
31
32impl Partitions {
33    /// Returns the range of each partition
34    ///
35    /// Consecutive ranges will be contiguous: i.e [`(a, b)` and `(b, c)`], and
36    /// `start = 0` and `end = self.len()` for the first and last range respectively
37    pub fn ranges(&self) -> Vec<Range<usize>> {
38        let boundaries = match &self.0 {
39            Some(boundaries) => boundaries,
40            None => return vec![],
41        };
42
43        let mut out = vec![];
44        let mut current = 0;
45        for idx in boundaries.set_indices() {
46            let t = current;
47            current = idx + 1;
48            out.push(t..current)
49        }
50        let last = boundaries.len() + 1;
51        if current != last {
52            out.push(current..last)
53        }
54        out
55    }
56
57    /// Returns the number of partitions
58    pub fn len(&self) -> usize {
59        match &self.0 {
60            Some(b) => b.count_set_bits() + 1,
61            None => 0,
62        }
63    }
64
65    /// Returns true if this contains no partitions
66    pub fn is_empty(&self) -> bool {
67        self.0.is_none()
68    }
69}
70
71/// Given a list of lexicographically sorted columns, computes the [`Partitions`],
72/// where a partition consists of the set of consecutive rows with equal values
73///
74/// Returns an error if no columns are specified or all columns do not
75/// have the same number of rows.
76///
77/// # Example:
78///
79/// For example, given columns `x`, `y` and `z`, calling
80/// [`partition`]`(values, (x, y))` will divide the
81/// rows into ranges where the values of `(x, y)` are equal:
82///
83/// ```text
84/// ┌ ─ ┬───┬ ─ ─┌───┐─ ─ ┬───┬ ─ ─ ┐
85///     │ 1 │    │ 1 │    │ A │        Range: 0..1 (x=1, y=1)
86/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
87///     │ 1 │    │ 2 │    │ B │
88/// │   ├───┤    ├───┤    ├───┤     │
89///     │ 1 │    │ 2 │    │ C │        Range: 1..4 (x=1, y=2)
90/// │   ├───┤    ├───┤    ├───┤     │
91///     │ 1 │    │ 2 │    │ D │
92/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
93///     │ 2 │    │ 1 │    │ E │        Range: 4..5 (x=2, y=1)
94/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
95///     │ 3 │    │ 1 │    │ F │        Range: 5..6 (x=3, y=1)
96/// └ ─ ┴───┴ ─ ─└───┘─ ─ ┴───┴ ─ ─ ┘
97///
98///       x        y        z     partition(&[x, y])
99/// ```
100///
101/// # Example Code
102///
103/// ```
104/// # use std::{sync::Arc, ops::Range};
105/// # use arrow_array::{RecordBatch, Int64Array, StringArray, ArrayRef};
106/// # use arrow_ord::sort::{SortColumn, SortOptions};
107/// # use arrow_ord::partition::partition;
108/// let batch = RecordBatch::try_from_iter(vec![
109///     ("x", Arc::new(Int64Array::from(vec![1, 1, 1, 1, 2, 3])) as ArrayRef),
110///     ("y", Arc::new(Int64Array::from(vec![1, 2, 2, 2, 1, 1])) as ArrayRef),
111///     ("z", Arc::new(StringArray::from(vec!["A", "B", "C", "D", "E", "F"])) as ArrayRef),
112/// ]).unwrap();
113///
114/// // Partition on first two columns
115/// let ranges = partition(&batch.columns()[..2]).unwrap().ranges();
116///
117/// let expected = vec![
118///     (0..1),
119///     (1..4),
120///     (4..5),
121///     (5..6),
122/// ];
123///
124/// assert_eq!(ranges, expected);
125/// ```
126pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
127    if columns.is_empty() {
128        return Err(ArrowError::InvalidArgumentError(
129            "Partition requires at least one column".to_string(),
130        ));
131    }
132    let num_rows = columns[0].len();
133    if columns.iter().any(|item| item.len() != num_rows) {
134        return Err(ArrowError::InvalidArgumentError(
135            "Partition columns have different row counts".to_string(),
136        ));
137    };
138
139    match num_rows {
140        0 => return Ok(Partitions(None)),
141        1 => return Ok(Partitions(Some(BooleanBuffer::new_unset(0)))),
142        _ => {}
143    }
144
145    let acc = find_boundaries(&columns[0])?;
146    let acc = columns
147        .iter()
148        .skip(1)
149        .try_fold(acc, |acc, c| find_boundaries(c.as_ref()).map(|b| &acc | &b))?;
150
151    Ok(Partitions(Some(acc)))
152}
153
154/// Returns a mask with bits set whenever the value or nullability changes
155fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
156    let slice_len = v.len() - 1;
157    let v1 = v.slice(0, slice_len);
158    let v2 = v.slice(1, slice_len);
159    Ok(distinct(&v1, &v2)?.values().clone())
160}
161
162#[cfg(test)]
163mod tests {
164    use std::sync::Arc;
165
166    use arrow_array::*;
167    use arrow_schema::DataType;
168
169    use super::*;
170
171    #[test]
172    fn test_partition_empty() {
173        let err = partition(&[]).unwrap_err();
174        assert_eq!(
175            err.to_string(),
176            "Invalid argument error: Partition requires at least one column"
177        );
178    }
179
180    #[test]
181    fn test_partition_unaligned_rows() {
182        let input = vec![
183            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
184            Arc::new(StringArray::from(vec![Some("foo")])) as _,
185        ];
186        let err = partition(&input).unwrap_err();
187        assert_eq!(
188            err.to_string(),
189            "Invalid argument error: Partition columns have different row counts"
190        )
191    }
192
193    #[test]
194    fn test_partition_small() {
195        let results = partition(&[
196            Arc::new(Int32Array::new(vec![].into(), None)) as _,
197            Arc::new(Int32Array::new(vec![].into(), None)) as _,
198            Arc::new(Int32Array::new(vec![].into(), None)) as _,
199        ])
200        .unwrap();
201        assert_eq!(results.len(), 0);
202        assert!(results.is_empty());
203
204        let results = partition(&[
205            Arc::new(Int32Array::from(vec![1])) as _,
206            Arc::new(Int32Array::from(vec![1])) as _,
207        ])
208        .unwrap()
209        .ranges();
210        assert_eq!(results.len(), 1);
211        assert_eq!(results[0], 0..1);
212    }
213
214    #[test]
215    fn test_partition_single_column() {
216        let a = Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]);
217        let input = vec![Arc::new(a) as _];
218        assert_eq!(
219            partition(&input).unwrap().ranges(),
220            vec![(0..1), (1..8), (8..9)],
221        );
222    }
223
224    #[test]
225    fn test_partition_all_equal_values() {
226        let a = Int64Array::from_value(1, 1000);
227        let input = vec![Arc::new(a) as _];
228        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
229    }
230
231    #[test]
232    fn test_partition_all_null_values() {
233        let input = vec![
234            new_null_array(&DataType::Int8, 1000),
235            new_null_array(&DataType::UInt16, 1000),
236        ];
237        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
238    }
239
240    #[test]
241    fn test_partition_unique_column_1() {
242        let input = vec![
243            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
244            Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as _,
245        ];
246        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1), (1..2)],);
247    }
248
249    #[test]
250    fn test_partition_unique_column_2() {
251        let input = vec![
252            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) as _,
253            Arc::new(StringArray::from(vec![
254                Some("foo"),
255                Some("bar"),
256                Some("apple"),
257            ])) as _,
258        ];
259        assert_eq!(
260            partition(&input).unwrap().ranges(),
261            vec![(0..1), (1..2), (2..3),],
262        );
263    }
264
265    #[test]
266    fn test_partition_non_unique_column_1() {
267        let input = vec![
268            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1), Some(1)])) as _,
269            Arc::new(StringArray::from(vec![
270                Some("foo"),
271                Some("bar"),
272                Some("bar"),
273                Some("bar"),
274            ])) as _,
275        ];
276        assert_eq!(
277            partition(&input).unwrap().ranges(),
278            vec![(0..1), (1..3), (3..4),],
279        );
280    }
281
282    #[test]
283    fn test_partition_masked_nulls() {
284        let input = vec![
285            Arc::new(Int64Array::new(vec![1; 9].into(), None)) as _,
286            Arc::new(Int64Array::new(
287                vec![1, 1, 2, 2, 2, 3, 3, 3, 3].into(),
288                Some(vec![false, true, true, true, true, false, false, true, false].into()),
289            )) as _,
290            Arc::new(Int64Array::new(
291                vec![1, 1, 2, 2, 2, 2, 2, 3, 7].into(),
292                Some(vec![true, true, true, true, false, true, true, true, false].into()),
293            )) as _,
294        ];
295
296        assert_eq!(
297            partition(&input).unwrap().ranges(),
298            vec![(0..1), (1..2), (2..4), (4..5), (5..7), (7..8), (8..9)],
299        );
300    }
301}