await_tree/context.rs
1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Write};
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use indextree::{Arena, NodeId};
19use itertools::Itertools;
20use parking_lot::{Mutex, MutexGuard};
21
22use crate::root::current_context;
23use crate::Span;
24
25/// Node in the span tree.
26#[derive(Debug, Clone)]
27struct SpanNode {
28 /// The span value.
29 span: Span,
30
31 /// The time when this span was started, or the future was first polled.
32 start_time: coarsetime::Instant,
33}
34
35impl SpanNode {
36 /// Create a new node with the given value.
37 fn new(span: Span) -> Self {
38 Self {
39 span,
40 start_time: coarsetime::Instant::now(),
41 }
42 }
43}
44
45/// The id of an await-tree context.
46///
47/// We will check the id recorded in the instrumented future against the current task-local context
48/// before trying to update the tree.
49///
50/// Also used as the key for anonymous trees in the registry. Intentionally made private to prevent
51/// users from reusing the same id when registering a new tree.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
53pub(crate) struct ContextId(u64);
54
55/// An await-tree for a task.
56#[derive(Debug, Clone)]
57pub struct Tree {
58 /// The arena for allocating span nodes in this context.
59 arena: Arena<SpanNode>,
60
61 /// The root span node.
62 root: NodeId,
63
64 /// The current span node. This is the node that is currently being polled.
65 current: NodeId,
66}
67
68impl std::fmt::Display for Tree {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 fn fmt_node(
71 f: &mut std::fmt::Formatter<'_>,
72 arena: &Arena<SpanNode>,
73 node: NodeId,
74 depth: usize,
75 current: NodeId,
76 ) -> std::fmt::Result {
77 f.write_str(&" ".repeat(depth * 2))?;
78
79 let inner = arena[node].get();
80 f.write_str(inner.span.as_str())?;
81
82 let elapsed: std::time::Duration = inner.start_time.elapsed().into();
83 write!(
84 f,
85 " [{}{:.3?}]",
86 if depth > 0 && elapsed.as_secs() >= 10 {
87 "!!! "
88 } else {
89 ""
90 },
91 elapsed
92 )?;
93
94 if depth > 0 && node == current {
95 f.write_str(" <== current")?;
96 }
97
98 f.write_char('\n')?;
99 for child in node
100 .children(arena)
101 .sorted_by_key(|&id| arena[id].get().start_time)
102 {
103 fmt_node(f, arena, child, depth + 1, current)?;
104 }
105
106 Ok(())
107 }
108
109 fmt_node(f, &self.arena, self.root, 0, self.current)?;
110
111 // Format all detached spans.
112 for node in self.arena.iter().filter(|n| !n.is_removed()) {
113 let id = self.arena.get_node_id(node).unwrap();
114 if id == self.root {
115 continue;
116 }
117 if node.parent().is_none() {
118 writeln!(f, "[Detached {id}]")?;
119 fmt_node(f, &self.arena, id, 1, self.current)?;
120 }
121 }
122
123 Ok(())
124 }
125}
126
127impl Tree {
128 /// Get the count of active span nodes in this context.
129 #[cfg(test)]
130 pub(crate) fn active_node_count(&self) -> usize {
131 self.arena.iter().filter(|n| !n.is_removed()).count()
132 }
133
134 /// Get the count of active detached span nodes in this context.
135 #[cfg(test)]
136 pub(crate) fn detached_node_count(&self) -> usize {
137 self.arena
138 .iter()
139 .filter(|n| {
140 !n.is_removed()
141 && n.parent().is_none()
142 && self.arena.get_node_id(n).unwrap() != self.root
143 })
144 .count()
145 }
146
147 /// Push a new span as a child of current span, used for future firstly polled.
148 ///
149 /// Returns the new current span.
150 pub(crate) fn push(&mut self, span: Span) -> NodeId {
151 let child = self.arena.new_node(SpanNode::new(span));
152 self.current.prepend(child, &mut self.arena);
153 self.current = child;
154 child
155 }
156
157 /// Step in the current span to the given child, used for future polled again.
158 ///
159 /// If the child is not actually a child of the current span, it means we are using a new future
160 /// to poll it, so we need to detach it from the previous parent, and attach it to the current
161 /// span.
162 pub(crate) fn step_in(&mut self, child: NodeId) {
163 if !self.current.children(&self.arena).contains(&child) {
164 // Actually we can always call this even if `child` is already a child of `current`. But
165 // checking first performs better.
166 self.current.prepend(child, &mut self.arena);
167 }
168 self.current = child;
169 }
170
171 /// Pop the current span to the parent, used for future ready.
172 ///
173 /// Note that there might still be some children of this node, like `select_stream.next()`.
174 /// The children might be polled again later, and will be attached as the children of a new
175 /// span.
176 pub(crate) fn pop(&mut self) {
177 let parent = self.arena[self.current]
178 .parent()
179 .expect("the root node should not be popped");
180 self.remove_and_detach(self.current);
181 self.current = parent;
182 }
183
184 /// Step out the current span to the parent, used for future pending.
185 pub(crate) fn step_out(&mut self) {
186 let parent = self.arena[self.current]
187 .parent()
188 .expect("the root node should not be stepped out");
189 self.current = parent;
190 }
191
192 /// Remove the current span and detach the children, used for future aborting.
193 ///
194 /// The children might be polled again later, and will be attached as the children of a new
195 /// span.
196 pub(crate) fn remove_and_detach(&mut self, node: NodeId) {
197 node.detach(&mut self.arena);
198 // Removing detached `node` makes children detached.
199 node.remove(&mut self.arena);
200 }
201
202 /// Get the current span node id.
203 pub(crate) fn current(&self) -> NodeId {
204 self.current
205 }
206}
207
208/// The task-local await-tree context.
209#[derive(Debug)]
210pub(crate) struct TreeContext {
211 /// The id of the context.
212 id: ContextId,
213
214 /// Whether to include the "verbose" span in the tree.
215 verbose: bool,
216
217 /// The await-tree.
218 tree: Mutex<Tree>,
219}
220
221impl TreeContext {
222 /// Create a new context.
223 pub(crate) fn new(root_span: Span, verbose: bool) -> Self {
224 static ID: AtomicU64 = AtomicU64::new(0);
225 let id = ID.fetch_add(1, Ordering::Relaxed);
226
227 let mut arena = Arena::new();
228 let root = arena.new_node(SpanNode::new(root_span));
229
230 Self {
231 id: ContextId(id),
232 verbose,
233 tree: Tree {
234 arena,
235 root,
236 current: root,
237 }
238 .into(),
239 }
240 }
241
242 /// Get the context id.
243 pub(crate) fn id(&self) -> ContextId {
244 self.id
245 }
246
247 /// Returns the locked guard of the tree.
248 pub(crate) fn tree(&self) -> MutexGuard<'_, Tree> {
249 self.tree.lock()
250 }
251
252 /// Whether the verbose span should be included.
253 pub(crate) fn verbose(&self) -> bool {
254 self.verbose
255 }
256}
257
258/// Get the await-tree of current task. Returns `None` if we're not instrumented.
259///
260/// This is useful if you want to check which component or runtime task is calling this function.
261pub fn current_tree() -> Option<Tree> {
262 current_context().map(|c| c.tree().clone())
263}