datafusion_physical_plan/aggregates/order/full.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_expr::EmitTo;
19use std::mem::size_of;
20
21/// Tracks grouping state when the data is ordered entirely by its
22/// group keys
23///
24/// When the group values are sorted, as soon as we see group `n+1` we
25/// know we will never see any rows for group `n` again and thus they
26/// can be emitted.
27///
28/// For example, given `SUM(amt) GROUP BY id` if the input is sorted
29/// by `id` as soon as a new `id` value is seen all previous values
30/// can be emitted.
31///
32/// The state is tracked like this:
33///
34/// ```text
35/// ┌─────┐ ┌──────────────────┐
36/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓
37/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃
38/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛
39/// │ ... │ │ ... │ │
40/// │┌───┐│ │ ┌──────────────┐ │ │ current
41/// ││12 ││ │ │ 234 │ │ │
42/// │├───┤│ │ ├──────────────┤ │ │
43/// ││12 ││ │ │ 234 │ │ │
44/// │├───┤│ │ ├──────────────┤ │ │
45/// ││13 ││ │ │ 456 │◀┼───┘
46/// │└───┘│ │ └──────────────┘ │
47/// └─────┘ └──────────────────┘
48///
49/// group indices group_values current tracks the most
50/// (in group value recent group index
51/// order)
52/// ```
53///
54/// In this diagram, the current group is `13`, and thus groups
55/// `0..12` can be emitted. Note that `13` can not yet be emitted as
56/// there may be more values in the next batch with the same group_id.
57#[derive(Debug)]
58pub struct GroupOrderingFull {
59 state: State,
60}
61
62#[derive(Debug)]
63enum State {
64 /// Seen no input yet
65 Start,
66
67 /// Data is in progress. `current` is the current group for which
68 /// values are being generated. Can emit `current` - 1
69 InProgress { current: usize },
70
71 /// Seen end of input: all groups can be emitted
72 Complete,
73}
74
75impl GroupOrderingFull {
76 pub fn new() -> Self {
77 Self {
78 state: State::Start,
79 }
80 }
81
82 // How many groups be emitted, or None if no data can be emitted
83 pub fn emit_to(&self) -> Option<EmitTo> {
84 match &self.state {
85 State::Start => None,
86 State::InProgress { current, .. } => {
87 if *current == 0 {
88 // Can not emit if still on the first row
89 None
90 } else {
91 // otherwise emit all rows prior to the current group
92 Some(EmitTo::First(*current))
93 }
94 }
95 State::Complete { .. } => Some(EmitTo::All),
96 }
97 }
98
99 /// remove the first n groups from the internal state, shifting
100 /// all existing indexes down by `n`
101 pub fn remove_groups(&mut self, n: usize) {
102 match &mut self.state {
103 State::Start => panic!("invalid state: start"),
104 State::InProgress { current } => {
105 // shift down by n
106 assert!(*current >= n);
107 *current -= n;
108 }
109 State::Complete { .. } => panic!("invalid state: complete"),
110 }
111 }
112
113 /// Note that the input is complete so any outstanding groups are done as well
114 pub fn input_done(&mut self) {
115 self.state = State::Complete;
116 }
117
118 /// Called when new groups are added in a batch. See documentation
119 /// on [`super::GroupOrdering::new_groups`]
120 pub fn new_groups(&mut self, total_num_groups: usize) {
121 assert_ne!(total_num_groups, 0);
122
123 // Update state
124 let max_group_index = total_num_groups - 1;
125 self.state = match self.state {
126 State::Start => State::InProgress {
127 current: max_group_index,
128 },
129 State::InProgress { current } => {
130 // expect to see new group indexes when called again
131 assert!(current <= max_group_index, "{current} <= {max_group_index}");
132 State::InProgress {
133 current: max_group_index,
134 }
135 }
136 State::Complete { .. } => {
137 panic!("Saw new group after input was complete");
138 }
139 };
140 }
141
142 pub(crate) fn size(&self) -> usize {
143 size_of::<Self>()
144 }
145}
146
147impl Default for GroupOrderingFull {
148 fn default() -> Self {
149 Self::new()
150 }
151}