datafusion_functions_aggregate_common/merge_arrays.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::compute::SortOptions;
use datafusion_common::utils::compare_rows;
use datafusion_common::{exec_err, ScalarValue};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data from
/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this
/// struct returns smallest `CustomElement`, where smallest is determined by
/// `ordering` values (`Vec<ScalarValue>`) according to `sort_options`.
#[derive(Debug, PartialEq, Eq)]
struct CustomElement<'a> {
/// Stores the partition this entry came from
branch_idx: usize,
/// Values to merge
value: ScalarValue,
// Comparison "key"
ordering: Vec<ScalarValue>,
/// Options defining the ordering semantics
sort_options: &'a [SortOptions],
}
impl<'a> CustomElement<'a> {
fn new(
branch_idx: usize,
value: ScalarValue,
ordering: Vec<ScalarValue>,
sort_options: &'a [SortOptions],
) -> Self {
Self {
branch_idx,
value,
ordering,
sort_options,
}
}
fn ordering(
&self,
current: &[ScalarValue],
target: &[ScalarValue],
) -> datafusion_common::Result<Ordering> {
// Calculate ordering according to `sort_options`
compare_rows(current, target, self.sort_options)
}
}
// Overwrite ordering implementation such that
// - `self.ordering` values are used for comparison,
// - When used inside `BinaryHeap` it is a min-heap.
impl<'a> Ord for CustomElement<'a> {
fn cmp(&self, other: &Self) -> Ordering {
// Compares according to custom ordering
self.ordering(&self.ordering, &other.ordering)
// Convert max heap to min heap
.map(|ordering| ordering.reverse())
// This function return error, when `self.ordering` and `other.ordering`
// have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`)
// Here this case won't happen, because data from each partition will have same type
.unwrap()
}
}
impl<'a> PartialOrd for CustomElement<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single array `Vec<ScalarValue>`
/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec<Vec<ScalarValue>>]`)
/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for the
/// each `ScalarValue` in the `values` array.
/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec<ScalarValue>`
/// of the `ordering_values` array).
///
/// As an example
/// values can be \[
/// \[1, 2, 3, 4, 5\],
/// \[1, 2, 3, 4\],
/// \[1, 2, 3, 4, 5, 6\],
/// \]
/// In this case we will be merging three arrays (doesn't have to be same size)
/// and produce a merged array with size 15 (sum of 5+4+6)
/// Merging will be done according to ordering at `ordering_values` vector.
/// As an example `ordering_values` can be [
/// \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
/// \[(1, a), (2, b), (3, b), (4, a) \],
/// \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
/// ]
/// For each ScalarValue in the `values` we have a corresponding `Vec<ScalarValue>` (like timestamp of it)
/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge.
/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match)
pub fn merge_ordered_arrays(
// We will merge values into single `Vec<ScalarValue>`.
values: &mut [VecDeque<ScalarValue>],
// `values` will be merged according to `ordering_values`.
// Inner `Vec<ScalarValue>` can be thought as ordering information for the
// each `ScalarValue` in the values`.
ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
// Defines according to which ordering comparisons should be done.
sort_options: &[SortOptions],
) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
// Keep track the most recent data of each branch, in binary heap data structure.
let mut heap = BinaryHeap::<CustomElement>::new();
if values.len() != ordering_values.len()
|| values
.iter()
.zip(ordering_values.iter())
.any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
{
return exec_err!(
"Expects values arguments and/or ordering_values arguments to have same size"
);
}
let n_branch = values.len();
let mut merged_values = vec![];
let mut merged_orderings = vec![];
// Continue iterating the loop until consuming data of all branches.
loop {
let minimum = if let Some(minimum) = heap.pop() {
minimum
} else {
// Heap is empty, fill it with the next entries from each branch.
for branch_idx in 0..n_branch {
if let Some(orderings) = ordering_values[branch_idx].pop_front() {
// Their size should be same, we can safely .unwrap here.
let value = values[branch_idx].pop_front().unwrap();
// Push the next element to the heap:
heap.push(CustomElement::new(
branch_idx,
value,
orderings,
sort_options,
));
}
// If None, we consumed this branch, skip it.
}
// Now we have filled the heap, get the largest entry (this will be
// the next element in merge).
if let Some(minimum) = heap.pop() {
minimum
} else {
// Heap is empty, this means that all indices are same with
// `end_indices`. We have consumed all of the branches, merge
// is completed, exit from the loop:
break;
}
};
let CustomElement {
branch_idx,
value,
ordering,
..
} = minimum;
// Add minimum value in the heap to the result
merged_values.push(value);
merged_orderings.push(ordering);
// If there is an available entry, push next entry in the most
// recently consumed branch to the heap.
if let Some(orderings) = ordering_values[branch_idx].pop_front() {
// Their size should be same, we can safely .unwrap here.
let value = values[branch_idx].pop_front().unwrap();
// Push the next element to the heap:
heap.push(CustomElement::new(
branch_idx,
value,
orderings,
sort_options,
));
}
}
Ok((merged_values, merged_orderings))
}