await_tree/
future.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::Pin;
17use std::task::Poll;
18
19use indextree::NodeId;
20use pin_project::{pin_project, pinned_drop};
21
22use crate::context::ContextId;
23use crate::root::current_context;
24use crate::Span;
25
26enum State {
27    Initial(Span),
28    Polled {
29        this_node: NodeId,
30        this_context_id: ContextId,
31    },
32    Ready,
33    /// This span is disabled due to `verbose` configuration.
34    Disabled,
35}
36
37/// The future for [`InstrumentAwait`][ia].
38///
39/// [ia]: crate::InstrumentAwait
40#[pin_project(PinnedDrop)]
41pub struct Instrumented<F: Future, const VERBOSE: bool> {
42    #[pin]
43    inner: F,
44    state: State,
45}
46
47impl<F: Future, const VERBOSE: bool> Instrumented<F, VERBOSE> {
48    pub(crate) fn new(inner: F, span: Span) -> Self {
49        Self {
50            inner,
51            state: State::Initial(span),
52        }
53    }
54}
55
56impl<F: Future, const VERBOSE: bool> Future for Instrumented<F, VERBOSE> {
57    type Output = F::Output;
58
59    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
60        let this = self.project();
61        let context = current_context();
62
63        let (context, this_node) = match this.state {
64            State::Initial(span) => {
65                match context {
66                    Some(c) => {
67                        if !c.verbose() && VERBOSE {
68                            // The tracing for this span is disabled according to the verbose
69                            // configuration.
70                            *this.state = State::Disabled;
71                            return this.inner.poll(cx);
72                        }
73                        // First polled, push a new span to the context.
74                        let node = c.tree().push(std::mem::take(span));
75                        *this.state = State::Polled {
76                            this_node: node,
77                            this_context_id: c.id(),
78                        };
79                        (c, node)
80                    }
81                    // Not in a context
82                    None => return this.inner.poll(cx),
83                }
84            }
85            State::Polled {
86                this_node,
87                this_context_id: this_context,
88            } => {
89                match context {
90                    // Context correct
91                    Some(c) if c.id() == *this_context => {
92                        // Polled before, just step in.
93                        c.tree().step_in(*this_node);
94                        (c, *this_node)
95                    }
96                    // Context changed
97                    Some(_) => {
98                        tracing::warn!(
99                            "future polled in a different context as it was first polled"
100                        );
101                        return this.inner.poll(cx);
102                    }
103                    // Out of context
104                    None => {
105                        tracing::warn!(
106                            "future polled not in a context, while it was when first polled"
107                        );
108                        return this.inner.poll(cx);
109                    }
110                }
111            }
112            State::Ready => unreachable!("the instrumented future should always be fused"),
113            State::Disabled => return this.inner.poll(cx),
114        };
115
116        // The current node must be the this_node.
117        debug_assert_eq!(this_node, context.tree().current());
118
119        match this.inner.poll(cx) {
120            // The future is ready, clean-up this span by popping from the context.
121            Poll::Ready(output) => {
122                context.tree().pop();
123                *this.state = State::Ready;
124                Poll::Ready(output)
125            }
126            // Still pending, just step out.
127            Poll::Pending => {
128                context.tree().step_out();
129                Poll::Pending
130            }
131        }
132    }
133}
134
135#[pinned_drop]
136impl<F: Future, const VERBOSE: bool> PinnedDrop for Instrumented<F, VERBOSE> {
137    fn drop(self: Pin<&mut Self>) {
138        let this = self.project();
139
140        match this.state {
141            State::Polled {
142                this_node,
143                this_context_id,
144            } => match current_context() {
145                // Context correct
146                Some(c) if c.id() == *this_context_id => {
147                    c.tree().remove_and_detach(*this_node);
148                }
149                // Context changed
150                Some(_) => {
151                    tracing::warn!("future is dropped in a different context as it was first polled, cannot clean up!");
152                }
153                // Out of context
154                None => {
155                    tracing::warn!("future is not in a context, while it was when first polled, cannot clean up!");
156                }
157            },
158            State::Initial(_) | State::Ready | State::Disabled => {}
159        }
160    }
161}