fuel_core_metrics/futures/
future_tracker.rs

1use crate::futures::FuturesMetrics;
2use core::{
3    future::Future,
4    pin::Pin,
5    task::{
6        Context,
7        Poll,
8    },
9    time::Duration,
10};
11use std::time::Instant;
12
13/// The execution report of the future, generated by [`FutureTracker`].
14#[derive(Debug)]
15pub struct ExecutionTime<Output> {
16    /// The time spent for real action of the future.
17    busy: Duration,
18    /// The idle time of the future.
19    idle: Duration,
20    /// The output of the future.
21    output: Output,
22}
23
24impl<Output> ExecutionTime<Output> {
25    /// Extracts the future output and records the execution report into the metrics.
26    pub fn extract(self, metric: &FuturesMetrics) -> Output {
27        // TODO: Use `u128` when `AtomicU128` is stable.
28        metric.busy.inc_by(
29            u64::try_from(self.busy.as_nanos())
30                .expect("The task doesn't live longer than `u64`"),
31        );
32        metric.idle.inc_by(
33            u64::try_from(self.idle.as_nanos())
34                .expect("The task doesn't live longer than `u64`"),
35        );
36        self.output
37    }
38}
39
40/// A guard representing a span which has been entered and is currently
41/// executing.
42///
43/// When the guard is dropped, the span will be exited.
44///
45/// This is returned by the [`Span::enter`] function.
46#[derive(Debug)]
47#[must_use = "once a span has been entered, it should be exited"]
48struct Entered<'a> {
49    span: &'a mut Span,
50    busy_instant: Instant,
51}
52
53impl<'a> Entered<'a> {
54    fn new(span: &'a mut Span) -> Self {
55        Self {
56            span,
57            busy_instant: Instant::now(),
58        }
59    }
60}
61
62impl<'a> Drop for Entered<'a> {
63    #[inline(always)]
64    fn drop(&mut self) {
65        self.span.busy = self.span.busy.saturating_add(self.busy_instant.elapsed());
66        self.span.do_exit()
67    }
68}
69
70/// A handle representing a span, with the capability to enter the span if it
71/// exists.
72#[derive(Default, Debug, Clone)]
73struct Span {
74    /// The cumulative busy(active) time of the future across all `poll` calls.
75    busy: Duration,
76    /// The cumulative idle time of the future across all `poll` calls.
77    idle: Duration,
78    /// An [`Instant`] to track the idle time.
79    ///
80    /// If this is `None`, then the span has either closed or was never enabled.
81    idle_instant: Option<Instant>,
82}
83
84impl Span {
85    /// Enters this span, returning a guard that will exit the span when dropped.
86    #[inline(always)]
87    pub fn enter(&mut self) -> Entered<'_> {
88        self.do_enter();
89        Entered::new(self)
90    }
91
92    #[inline(always)]
93    fn do_enter(&mut self) {
94        let idle_instant = core::mem::take(&mut self.idle_instant);
95
96        if let Some(idle_instant) = idle_instant {
97            self.idle = self.idle.saturating_add(idle_instant.elapsed());
98        }
99    }
100
101    #[inline(always)]
102    fn do_exit(&mut self) {
103        self.idle_instant = Some(Instant::now());
104    }
105}
106
107pin_project_lite::pin_project! {
108    /// A [`Future`] that has been tracked with a [`Span`].
109    /// It tracks the execution time of the future, it's active and idle phases.
110    #[derive(Debug, Clone)]
111    #[must_use = "futures do nothing unless you `.await` or poll them"]
112    pub struct FutureTracker<T> {
113        #[pin]
114        inner: T,
115        span: Span,
116    }
117}
118
119impl<T> FutureTracker<T> {
120    /// Creates a [`FutureTracker`] wrapper around the `inner` future.
121    pub fn new(inner: T) -> Self {
122        Self {
123            inner,
124            span: Default::default(),
125        }
126    }
127}
128
129impl<T: Future> Future for FutureTracker<T> {
130    type Output = ExecutionTime<T::Output>;
131
132    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133        let this = self.project();
134        let enter = this.span.enter();
135        let output = this.inner.poll(cx);
136
137        match output {
138            Poll::Ready(output) => {
139                drop(enter);
140                Poll::Ready(ExecutionTime {
141                    busy: this.span.busy,
142                    idle: this.span.idle,
143                    output,
144                })
145            }
146            Poll::Pending => Poll::Pending,
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use std::time::Duration;
154
155    use crate::futures::future_tracker::FutureTracker;
156
157    #[tokio::test]
158    async fn empty_future() {
159        let future = async { tokio::task::yield_now().await };
160        let wrapper_future = FutureTracker::new(future);
161        let result = wrapper_future.await;
162        assert_eq!(result.idle.as_secs(), 0)
163    }
164
165    #[tokio::test]
166    async fn idle_time_correct() {
167        let future = async { tokio::time::sleep(Duration::from_secs(1)).await };
168        let wrapper_future = FutureTracker::new(future);
169        let result = wrapper_future.await;
170        assert_eq!(result.idle.as_secs(), 1)
171    }
172
173    #[tokio::test]
174    async fn busy_time_correct() {
175        let future = async { std::thread::sleep(Duration::from_secs(1)) };
176        let wrapper_future = FutureTracker::new(future);
177        let result = wrapper_future.await;
178        assert_eq!(result.idle.as_secs(), 0);
179        assert_eq!(result.busy.as_secs(), 1);
180    }
181
182    #[tokio::test]
183    async fn hybrid_time_correct() {
184        let future = async {
185            tokio::time::sleep(Duration::from_secs(2)).await;
186            std::thread::sleep(Duration::from_secs(1));
187        };
188        let wrapper_future = FutureTracker::new(future);
189        let result = wrapper_future.await;
190        assert_eq!(result.idle.as_secs(), 2);
191        assert_eq!(result.busy.as_secs(), 1);
192    }
193
194    #[tokio::test]
195    async fn hybrid_time_correct_complex_case() {
196        let future = async {
197            tokio::time::sleep(Duration::from_secs(1)).await;
198            std::thread::sleep(Duration::from_secs(1));
199            tokio::time::sleep(Duration::from_secs(2)).await;
200            std::thread::sleep(Duration::from_secs(2));
201            tokio::time::sleep(Duration::from_secs(3)).await;
202        };
203        let wrapper_future = FutureTracker::new(future);
204        let result = wrapper_future.await;
205        assert_eq!(result.idle.as_secs(), 6);
206        assert_eq!(result.busy.as_secs(), 3);
207    }
208}