datafusion_physical_plan/aggregates/group_values/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! [`GroupValues`] trait for storing and interning group keys
use arrow::record_batch::RecordBatch;
use arrow_array::{downcast_primitive, ArrayRef};
use arrow_schema::{DataType, SchemaRef};
use bytes_view::GroupValuesBytesView;
use datafusion_common::Result;
pub(crate) mod primitive;
use datafusion_expr::EmitTo;
use primitive::GroupValuesPrimitive;
mod column;
mod row;
use column::GroupValuesColumn;
use row::GroupValuesRows;
mod bytes;
mod bytes_view;
use bytes::GroupValuesByes;
use datafusion_physical_expr::binary_map::OutputType;
mod group_column;
mod null_builder;
/// Stores the group values during hash aggregation.
///
/// # Background
///
/// In a query such as `SELECT a, b, count(*) FROM t GROUP BY a, b`, the group values
/// identify each group, and correspond to all the distinct values of `(a,b)`.
///
/// ```sql
/// -- Input has 4 rows with 3 distinct combinations of (a,b) ("groups")
/// create table t(a int, b varchar)
/// as values (1, 'a'), (2, 'b'), (1, 'a'), (3, 'c');
///
/// select a, b, count(*) from t group by a, b;
/// ----
/// 1 a 2
/// 2 b 1
/// 3 c 1
/// ```
///
/// # Design
///
/// Managing group values is a performance critical operation in hash
/// aggregation. The major operations are:
///
/// 1. Intern: Quickly finding existing and adding new group values
/// 2. Emit: Returning the group values as an array
///
/// There are multiple specialized implementations of this trait optimized for
/// different data types and number of columns, optimized for these operations.
/// See [`new_group_values`] for details.
///
/// # Group Ids
///
/// Each distinct group in a hash aggregation is identified by a unique group id
/// (usize) which is assigned by instances of this trait. Group ids are
/// continuous without gaps, starting from 0.
pub trait GroupValues: Send {
/// Calculates the group id for each input row of `cols`, assigning new
/// group ids as necessary.
///
/// When the function returns, `groups` must contain the group id for each
/// row in `cols`.
///
/// If a row has the same value as a previous row, the same group id is
/// assigned. If a row has a new value, the next available group id is
/// assigned.
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
/// Returns the number of bytes of memory used by this [`GroupValues`]
fn size(&self) -> usize;
/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;
/// The number of values (distinct group values) stored in this [`GroupValues`]
fn len(&self) -> usize;
/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);
}
/// Return a specialized implementation of [`GroupValues`] for the given schema.
pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();
macro_rules! downcast_helper {
($t:ty, $d:ident) => {
return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone())))
};
}
downcast_primitive! {
d => (downcast_helper, d),
_ => {}
}
match d {
DataType::Utf8 => {
return Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Utf8)));
}
DataType::LargeUtf8 => {
return Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Utf8)));
}
DataType::Utf8View => {
return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View)));
}
DataType::Binary => {
return Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Binary)));
}
DataType::LargeBinary => {
return Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Binary)));
}
DataType::BinaryView => {
return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView)));
}
_ => {}
}
}
if GroupValuesColumn::supported_schema(schema.as_ref()) {
Ok(Box::new(GroupValuesColumn::try_new(schema)?))
} else {
Ok(Box::new(GroupValuesRows::try_new(schema)?))
}
}