await_tree/
context.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Write};
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use indextree::{Arena, NodeId};
19use itertools::Itertools;
20use parking_lot::{Mutex, MutexGuard};
21
22use crate::root::current_context;
23use crate::Span;
24
25/// Node in the span tree.
26#[derive(Debug, Clone)]
27struct SpanNode {
28    /// The span value.
29    span: Span,
30
31    /// The time when this span was started, or the future was first polled.
32    start_time: coarsetime::Instant,
33}
34
35impl SpanNode {
36    /// Create a new node with the given value.
37    fn new(span: Span) -> Self {
38        Self {
39            span,
40            start_time: coarsetime::Instant::now(),
41        }
42    }
43}
44
45/// The id of an await-tree context.
46///
47/// We will check the id recorded in the instrumented future against the current task-local context
48/// before trying to update the tree.
49///
50/// Also used as the key for anonymous trees in the registry. Intentionally made private to prevent
51/// users from reusing the same id when registering a new tree.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
53pub(crate) struct ContextId(pub(crate) u64);
54
55/// An await-tree for a task.
56#[derive(Debug, Clone)]
57pub struct Tree {
58    /// The arena for allocating span nodes in this context.
59    arena: Arena<SpanNode>,
60
61    /// The root span node.
62    root: NodeId,
63
64    /// The current span node. This is the node that is currently being polled.
65    current: NodeId,
66}
67
68#[cfg(feature = "serde")]
69mod serde_impl {
70    use serde::ser::SerializeStruct as _;
71    use serde::Serialize;
72
73    use super::*;
74
75    struct SpanNodeSer<'a> {
76        arena: &'a Arena<SpanNode>,
77        node: NodeId,
78    }
79
80    impl Serialize for SpanNodeSer<'_> {
81        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
82        where
83            S: serde::Serializer,
84        {
85            let inner = self.arena[self.node].get();
86            let mut s = serializer.serialize_struct("Span", 4)?;
87
88            // Basic info.
89            let id: usize = self.node.into();
90            s.serialize_field("id", &id)?;
91            s.serialize_field("span", &inner.span)?;
92            s.serialize_field("elapsed_ns", &inner.start_time.elapsed().as_nanos())?;
93
94            // Children.
95            let children = (self.node.children(self.arena))
96                .map(|node| SpanNodeSer {
97                    arena: self.arena,
98                    node,
99                })
100                .sorted_by_key(|child| {
101                    let inner = self.arena[child.node].get();
102                    inner.start_time
103                })
104                .collect_vec();
105            s.serialize_field("children", &children)?;
106
107            s.end()
108        }
109    }
110
111    impl Serialize for Tree {
112        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
113        where
114            S: serde::Serializer,
115        {
116            let mut s = serializer.serialize_struct("Tree", 3)?;
117
118            let current_id: usize = self.current.into();
119            s.serialize_field("current", &current_id)?;
120
121            // The main tree.
122            s.serialize_field(
123                "tree",
124                &SpanNodeSer {
125                    arena: &self.arena,
126                    node: self.root,
127                },
128            )?;
129
130            // The detached subtrees.
131            let detached = self
132                .detached_roots()
133                .map(|node| SpanNodeSer {
134                    arena: &self.arena,
135                    node,
136                })
137                .collect_vec();
138            s.serialize_field("detached", &detached)?;
139
140            s.end()
141        }
142    }
143}
144
145impl std::fmt::Display for Tree {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        fn fmt_node(
148            f: &mut std::fmt::Formatter<'_>,
149            arena: &Arena<SpanNode>,
150            node: NodeId,
151            depth: usize,
152            current: NodeId,
153        ) -> std::fmt::Result {
154            f.write_str(&" ".repeat(depth * 2))?;
155
156            let inner = arena[node].get();
157            f.write_str(&inner.span.name)?;
158
159            let elapsed: std::time::Duration = inner.start_time.elapsed().into();
160            write!(
161                f,
162                " [{}{:.3?}]",
163                if !inner.span.is_long_running && elapsed.as_secs() >= 10 {
164                    "!!! "
165                } else {
166                    ""
167                },
168                elapsed
169            )?;
170
171            if depth > 0 && node == current {
172                f.write_str("  <== current")?;
173            }
174
175            f.write_char('\n')?;
176            for child in node
177                .children(arena)
178                .sorted_by_key(|&id| arena[id].get().start_time)
179            {
180                fmt_node(f, arena, child, depth + 1, current)?;
181            }
182
183            Ok(())
184        }
185
186        fmt_node(f, &self.arena, self.root, 0, self.current)?;
187
188        // Format all detached spans.
189        for node in self.detached_roots() {
190            writeln!(f, "[Detached {node}]")?;
191            fmt_node(f, &self.arena, node, 1, self.current)?;
192        }
193
194        Ok(())
195    }
196}
197
198impl Tree {
199    /// Get the count of active span nodes in this context.
200    #[cfg(test)]
201    pub(crate) fn active_node_count(&self) -> usize {
202        self.arena.iter().filter(|n| !n.is_removed()).count()
203    }
204
205    /// Get the count of active detached span nodes in this context.
206    #[cfg(test)]
207    pub(crate) fn detached_node_count(&self) -> usize {
208        self.arena
209            .iter()
210            .filter(|n| {
211                !n.is_removed()
212                    && n.parent().is_none()
213                    && self.arena.get_node_id(n).unwrap() != self.root
214            })
215            .count()
216    }
217
218    /// Returns an iterator over the root nodes of detached subtrees.
219    fn detached_roots(&self) -> impl Iterator<Item = NodeId> + '_ {
220        self.arena
221            .iter()
222            .filter(|n| !n.is_removed()) // still valid
223            .map(|node| self.arena.get_node_id(node).unwrap()) // get id
224            .filter(|&id| id != self.root && self.arena[id].parent().is_none()) // no parent but non-root
225    }
226
227    /// Push a new span as a child of current span, used for future firstly polled.
228    ///
229    /// Returns the new current span.
230    pub(crate) fn push(&mut self, span: Span) -> NodeId {
231        let child = self.arena.new_node(SpanNode::new(span));
232        self.current.prepend(child, &mut self.arena);
233        self.current = child;
234        child
235    }
236
237    /// Step in the current span to the given child, used for future polled again.
238    ///
239    /// If the child is not actually a child of the current span, it means we are using a new future
240    /// to poll it, so we need to detach it from the previous parent, and attach it to the current
241    /// span.
242    pub(crate) fn step_in(&mut self, child: NodeId) {
243        if !self.current.children(&self.arena).contains(&child) {
244            // Actually we can always call this even if `child` is already a child of `current`. But
245            // checking first performs better.
246            self.current.prepend(child, &mut self.arena);
247        }
248        self.current = child;
249    }
250
251    /// Pop the current span to the parent, used for future ready.
252    ///
253    /// Note that there might still be some children of this node, like `select_stream.next()`.
254    /// The children might be polled again later, and will be attached as the children of a new
255    /// span.
256    pub(crate) fn pop(&mut self) {
257        let parent = self.arena[self.current]
258            .parent()
259            .expect("the root node should not be popped");
260        self.remove_and_detach(self.current);
261        self.current = parent;
262    }
263
264    /// Step out the current span to the parent, used for future pending.
265    pub(crate) fn step_out(&mut self) {
266        let parent = self.arena[self.current]
267            .parent()
268            .expect("the root node should not be stepped out");
269        self.current = parent;
270    }
271
272    /// Remove the current span and detach the children, used for future aborting.
273    ///
274    /// The children might be polled again later, and will be attached as the children of a new
275    /// span.
276    pub(crate) fn remove_and_detach(&mut self, node: NodeId) {
277        node.detach(&mut self.arena);
278        // Removing detached `node` makes children detached.
279        node.remove(&mut self.arena);
280    }
281
282    /// Get the current span node id.
283    pub(crate) fn current(&self) -> NodeId {
284        self.current
285    }
286}
287
288/// The task-local await-tree context.
289#[derive(Debug)]
290pub(crate) struct TreeContext {
291    /// The id of the context.
292    id: ContextId,
293
294    /// Whether to include the "verbose" span in the tree.
295    verbose: bool,
296
297    /// The await-tree.
298    tree: Mutex<Tree>,
299}
300
301impl TreeContext {
302    /// Create a new context.
303    pub(crate) fn new(root_span: Span, verbose: bool) -> Self {
304        static ID: AtomicU64 = AtomicU64::new(0);
305        let id = ID.fetch_add(1, Ordering::Relaxed);
306
307        // Always make the root span long-running.
308        let root_span = root_span.long_running();
309
310        let mut arena = Arena::new();
311        let root = arena.new_node(SpanNode::new(root_span));
312
313        Self {
314            id: ContextId(id),
315            verbose,
316            tree: Tree {
317                arena,
318                root,
319                current: root,
320            }
321            .into(),
322        }
323    }
324
325    /// Get the context id.
326    pub(crate) fn id(&self) -> ContextId {
327        self.id
328    }
329
330    /// Returns the locked guard of the tree.
331    pub(crate) fn tree(&self) -> MutexGuard<'_, Tree> {
332        self.tree.lock()
333    }
334
335    /// Whether the verbose span should be included.
336    pub(crate) fn verbose(&self) -> bool {
337        self.verbose
338    }
339}
340
341/// Get the await-tree of current task. Returns `None` if we're not instrumented.
342///
343/// This is useful if you want to check which component or runtime task is calling this function.
344pub fn current_tree() -> Option<Tree> {
345    current_context().map(|c| c.tree().clone())
346}