datafusion_physical_plan/aggregates/order/
full.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_expr::EmitTo;
19use std::mem::size_of;
20
21/// Tracks grouping state when the data is ordered entirely by its
22/// group keys
23///
24/// When the group values are sorted, as soon as we see group `n+1` we
25/// know we will never see any rows for group `n` again and thus they
26/// can be emitted.
27///
28/// For example, given `SUM(amt) GROUP BY id` if the input is sorted
29/// by `id` as soon as a new `id` value is seen all previous values
30/// can be emitted.
31///
32/// The state is tracked like this:
33///
34/// ```text
35///      ┌─────┐   ┌──────────────────┐
36///      │┌───┐│   │ ┌──────────────┐ │         ┏━━━━━━━━━━━━━━┓
37///      ││ 0 ││   │ │     123      │ │   ┌─────┃      13      ┃
38///      │└───┘│   │ └──────────────┘ │   │     ┗━━━━━━━━━━━━━━┛
39///      │ ... │   │    ...           │   │
40///      │┌───┐│   │ ┌──────────────┐ │   │         current
41///      ││12 ││   │ │     234      │ │   │
42///      │├───┤│   │ ├──────────────┤ │   │
43///      ││12 ││   │ │     234      │ │   │
44///      │├───┤│   │ ├──────────────┤ │   │
45///      ││13 ││   │ │     456      │◀┼───┘
46///      │└───┘│   │ └──────────────┘ │
47///      └─────┘   └──────────────────┘
48///
49///  group indices    group_values        current tracks the most
50/// (in group value                          recent group index
51///      order)
52/// ```
53///
54/// In this diagram, the current group is `13`, and thus groups
55/// `0..12` can be emitted. Note that `13` can not yet be emitted as
56/// there may be more values in the next batch with the same group_id.
57#[derive(Debug)]
58pub struct GroupOrderingFull {
59    state: State,
60}
61
62#[derive(Debug)]
63enum State {
64    /// Seen no input yet
65    Start,
66
67    /// Data is in progress. `current` is the current group for which
68    /// values are being generated. Can emit `current` - 1
69    InProgress { current: usize },
70
71    /// Seen end of input: all groups can be emitted
72    Complete,
73}
74
75impl GroupOrderingFull {
76    pub fn new() -> Self {
77        Self {
78            state: State::Start,
79        }
80    }
81
82    // How many groups be emitted, or None if no data can be emitted
83    pub fn emit_to(&self) -> Option<EmitTo> {
84        match &self.state {
85            State::Start => None,
86            State::InProgress { current, .. } => {
87                if *current == 0 {
88                    // Can not emit if still on the first row
89                    None
90                } else {
91                    // otherwise emit all rows prior to the current group
92                    Some(EmitTo::First(*current))
93                }
94            }
95            State::Complete { .. } => Some(EmitTo::All),
96        }
97    }
98
99    /// remove the first n groups from the internal state, shifting
100    /// all existing indexes down by `n`
101    pub fn remove_groups(&mut self, n: usize) {
102        match &mut self.state {
103            State::Start => panic!("invalid state: start"),
104            State::InProgress { current } => {
105                // shift down by n
106                assert!(*current >= n);
107                *current -= n;
108            }
109            State::Complete { .. } => panic!("invalid state: complete"),
110        }
111    }
112
113    /// Note that the input is complete so any outstanding groups are done as well
114    pub fn input_done(&mut self) {
115        self.state = State::Complete;
116    }
117
118    /// Called when new groups are added in a batch. See documentation
119    /// on [`super::GroupOrdering::new_groups`]
120    pub fn new_groups(&mut self, total_num_groups: usize) {
121        assert_ne!(total_num_groups, 0);
122
123        // Update state
124        let max_group_index = total_num_groups - 1;
125        self.state = match self.state {
126            State::Start => State::InProgress {
127                current: max_group_index,
128            },
129            State::InProgress { current } => {
130                // expect to see new group indexes when called again
131                assert!(current <= max_group_index, "{current} <= {max_group_index}");
132                State::InProgress {
133                    current: max_group_index,
134                }
135            }
136            State::Complete { .. } => {
137                panic!("Saw new group after input was complete");
138            }
139        };
140    }
141
142    pub(crate) fn size(&self) -> usize {
143        size_of::<Self>()
144    }
145}
146
147impl Default for GroupOrderingFull {
148    fn default() -> Self {
149        Self::new()
150    }
151}