await_tree/
context.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Write};
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use indextree::{Arena, NodeId};
19use itertools::Itertools;
20use parking_lot::{Mutex, MutexGuard};
21
22use crate::root::current_context;
23use crate::Span;
24
25/// Node in the span tree.
26#[derive(Debug, Clone)]
27struct SpanNode {
28    /// The span value.
29    span: Span,
30
31    /// The time when this span was started, or the future was first polled.
32    start_time: coarsetime::Instant,
33}
34
35impl SpanNode {
36    /// Create a new node with the given value.
37    fn new(span: Span) -> Self {
38        Self {
39            span,
40            start_time: coarsetime::Instant::now(),
41        }
42    }
43}
44
45/// The id of an await-tree context.
46///
47/// We will check the id recorded in the instrumented future against the current task-local context
48/// before trying to update the tree.
49///
50/// Also used as the key for anonymous trees in the registry. Intentionally made private to prevent
51/// users from reusing the same id when registering a new tree.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
53pub(crate) struct ContextId(u64);
54
55/// An await-tree for a task.
56#[derive(Debug, Clone)]
57pub struct Tree {
58    /// The arena for allocating span nodes in this context.
59    arena: Arena<SpanNode>,
60
61    /// The root span node.
62    root: NodeId,
63
64    /// The current span node. This is the node that is currently being polled.
65    current: NodeId,
66}
67
68impl std::fmt::Display for Tree {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        fn fmt_node(
71            f: &mut std::fmt::Formatter<'_>,
72            arena: &Arena<SpanNode>,
73            node: NodeId,
74            depth: usize,
75            current: NodeId,
76        ) -> std::fmt::Result {
77            f.write_str(&" ".repeat(depth * 2))?;
78
79            let inner = arena[node].get();
80            f.write_str(inner.span.as_str())?;
81
82            let elapsed: std::time::Duration = inner.start_time.elapsed().into();
83            write!(
84                f,
85                " [{}{:.3?}]",
86                if depth > 0 && elapsed.as_secs() >= 10 {
87                    "!!! "
88                } else {
89                    ""
90                },
91                elapsed
92            )?;
93
94            if depth > 0 && node == current {
95                f.write_str("  <== current")?;
96            }
97
98            f.write_char('\n')?;
99            for child in node
100                .children(arena)
101                .sorted_by_key(|&id| arena[id].get().start_time)
102            {
103                fmt_node(f, arena, child, depth + 1, current)?;
104            }
105
106            Ok(())
107        }
108
109        fmt_node(f, &self.arena, self.root, 0, self.current)?;
110
111        // Format all detached spans.
112        for node in self.arena.iter().filter(|n| !n.is_removed()) {
113            let id = self.arena.get_node_id(node).unwrap();
114            if id == self.root {
115                continue;
116            }
117            if node.parent().is_none() {
118                writeln!(f, "[Detached {id}]")?;
119                fmt_node(f, &self.arena, id, 1, self.current)?;
120            }
121        }
122
123        Ok(())
124    }
125}
126
127impl Tree {
128    /// Get the count of active span nodes in this context.
129    #[cfg(test)]
130    pub(crate) fn active_node_count(&self) -> usize {
131        self.arena.iter().filter(|n| !n.is_removed()).count()
132    }
133
134    /// Get the count of active detached span nodes in this context.
135    #[cfg(test)]
136    pub(crate) fn detached_node_count(&self) -> usize {
137        self.arena
138            .iter()
139            .filter(|n| {
140                !n.is_removed()
141                    && n.parent().is_none()
142                    && self.arena.get_node_id(n).unwrap() != self.root
143            })
144            .count()
145    }
146
147    /// Push a new span as a child of current span, used for future firstly polled.
148    ///
149    /// Returns the new current span.
150    pub(crate) fn push(&mut self, span: Span) -> NodeId {
151        let child = self.arena.new_node(SpanNode::new(span));
152        self.current.prepend(child, &mut self.arena);
153        self.current = child;
154        child
155    }
156
157    /// Step in the current span to the given child, used for future polled again.
158    ///
159    /// If the child is not actually a child of the current span, it means we are using a new future
160    /// to poll it, so we need to detach it from the previous parent, and attach it to the current
161    /// span.
162    pub(crate) fn step_in(&mut self, child: NodeId) {
163        if !self.current.children(&self.arena).contains(&child) {
164            // Actually we can always call this even if `child` is already a child of `current`. But
165            // checking first performs better.
166            self.current.prepend(child, &mut self.arena);
167        }
168        self.current = child;
169    }
170
171    /// Pop the current span to the parent, used for future ready.
172    ///
173    /// Note that there might still be some children of this node, like `select_stream.next()`.
174    /// The children might be polled again later, and will be attached as the children of a new
175    /// span.
176    pub(crate) fn pop(&mut self) {
177        let parent = self.arena[self.current]
178            .parent()
179            .expect("the root node should not be popped");
180        self.remove_and_detach(self.current);
181        self.current = parent;
182    }
183
184    /// Step out the current span to the parent, used for future pending.
185    pub(crate) fn step_out(&mut self) {
186        let parent = self.arena[self.current]
187            .parent()
188            .expect("the root node should not be stepped out");
189        self.current = parent;
190    }
191
192    /// Remove the current span and detach the children, used for future aborting.
193    ///
194    /// The children might be polled again later, and will be attached as the children of a new
195    /// span.
196    pub(crate) fn remove_and_detach(&mut self, node: NodeId) {
197        node.detach(&mut self.arena);
198        // Removing detached `node` makes children detached.
199        node.remove(&mut self.arena);
200    }
201
202    /// Get the current span node id.
203    pub(crate) fn current(&self) -> NodeId {
204        self.current
205    }
206}
207
208/// The task-local await-tree context.
209#[derive(Debug)]
210pub(crate) struct TreeContext {
211    /// The id of the context.
212    id: ContextId,
213
214    /// Whether to include the "verbose" span in the tree.
215    verbose: bool,
216
217    /// The await-tree.
218    tree: Mutex<Tree>,
219}
220
221impl TreeContext {
222    /// Create a new context.
223    pub(crate) fn new(root_span: Span, verbose: bool) -> Self {
224        static ID: AtomicU64 = AtomicU64::new(0);
225        let id = ID.fetch_add(1, Ordering::Relaxed);
226
227        let mut arena = Arena::new();
228        let root = arena.new_node(SpanNode::new(root_span));
229
230        Self {
231            id: ContextId(id),
232            verbose,
233            tree: Tree {
234                arena,
235                root,
236                current: root,
237            }
238            .into(),
239        }
240    }
241
242    /// Get the context id.
243    pub(crate) fn id(&self) -> ContextId {
244        self.id
245    }
246
247    /// Returns the locked guard of the tree.
248    pub(crate) fn tree(&self) -> MutexGuard<'_, Tree> {
249        self.tree.lock()
250    }
251
252    /// Whether the verbose span should be included.
253    pub(crate) fn verbose(&self) -> bool {
254        self.verbose
255    }
256}
257
258/// Get the await-tree of current task. Returns `None` if we're not instrumented.
259///
260/// This is useful if you want to check which component or runtime task is calling this function.
261pub fn current_tree() -> Option<Tree> {
262    current_context().map(|c| c.tree().clone())
263}