datafusion_physical_plan/aggregates/order/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::ArrayRef;
19use arrow::datatypes::Schema;
20use datafusion_common::Result;
21use datafusion_expr::EmitTo;
22use datafusion_physical_expr_common::sort_expr::LexOrdering;
23use std::mem::size_of;
24
25mod full;
26mod partial;
27
28use crate::InputOrderMode;
29pub use full::GroupOrderingFull;
30pub use partial::GroupOrderingPartial;
31
32/// Ordering information for each group in the hash table
33#[derive(Debug)]
34pub enum GroupOrdering {
35    /// Groups are not ordered
36    None,
37    /// Groups are ordered by some pre-set of the group keys
38    Partial(GroupOrderingPartial),
39    /// Groups are entirely contiguous,
40    Full(GroupOrderingFull),
41}
42
43impl GroupOrdering {
44    /// Create a `GroupOrdering` for the specified ordering
45    pub fn try_new(
46        input_schema: &Schema,
47        mode: &InputOrderMode,
48        ordering: &LexOrdering,
49    ) -> Result<Self> {
50        match mode {
51            InputOrderMode::Linear => Ok(GroupOrdering::None),
52            InputOrderMode::PartiallySorted(order_indices) => {
53                GroupOrderingPartial::try_new(input_schema, order_indices, ordering)
54                    .map(GroupOrdering::Partial)
55            }
56            InputOrderMode::Sorted => Ok(GroupOrdering::Full(GroupOrderingFull::new())),
57        }
58    }
59
60    // How many groups be emitted, or None if no data can be emitted
61    pub fn emit_to(&self) -> Option<EmitTo> {
62        match self {
63            GroupOrdering::None => None,
64            GroupOrdering::Partial(partial) => partial.emit_to(),
65            GroupOrdering::Full(full) => full.emit_to(),
66        }
67    }
68
69    /// Updates the state the input is done
70    pub fn input_done(&mut self) {
71        match self {
72            GroupOrdering::None => {}
73            GroupOrdering::Partial(partial) => partial.input_done(),
74            GroupOrdering::Full(full) => full.input_done(),
75        }
76    }
77
78    /// remove the first n groups from the internal state, shifting
79    /// all existing indexes down by `n`
80    pub fn remove_groups(&mut self, n: usize) {
81        match self {
82            GroupOrdering::None => {}
83            GroupOrdering::Partial(partial) => partial.remove_groups(n),
84            GroupOrdering::Full(full) => full.remove_groups(n),
85        }
86    }
87
88    /// Called when new groups are added in a batch
89    ///
90    /// * `total_num_groups`: total number of groups (so max
91    ///   group_index is total_num_groups - 1).
92    ///
93    /// * `group_values`: group key values for *each row* in the batch
94    ///
95    /// * `group_indices`: indices for each row in the batch
96    ///
97    /// * `hashes`: hash values for each row in the batch
98    pub fn new_groups(
99        &mut self,
100        batch_group_values: &[ArrayRef],
101        group_indices: &[usize],
102        total_num_groups: usize,
103    ) -> Result<()> {
104        match self {
105            GroupOrdering::None => {}
106            GroupOrdering::Partial(partial) => {
107                partial.new_groups(
108                    batch_group_values,
109                    group_indices,
110                    total_num_groups,
111                )?;
112            }
113            GroupOrdering::Full(full) => {
114                full.new_groups(total_num_groups);
115            }
116        };
117        Ok(())
118    }
119
120    /// Return the size of memory used by the ordering state, in bytes
121    pub fn size(&self) -> usize {
122        size_of::<Self>()
123            + match self {
124                GroupOrdering::None => 0,
125                GroupOrdering::Partial(partial) => partial.size(),
126                GroupOrdering::Full(full) => full.size(),
127            }
128    }
129}