fuel_core_metrics/futures/
future_tracker.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
use crate::futures::FuturesMetrics;
use core::{
    future::Future,
    pin::Pin,
    task::{
        Context,
        Poll,
    },
    time::Duration,
};
use std::time::Instant;

/// The execution report of the future, generated by [`FutureTracker`].
#[derive(Debug)]
pub struct ExecutionTime<Output> {
    /// The time spent for real action of the future.
    busy: Duration,
    /// The idle time of the future.
    idle: Duration,
    /// The output of the future.
    output: Output,
}

impl<Output> ExecutionTime<Output> {
    /// Extracts the future output and records the execution report into the metrics.
    pub fn extract(self, metric: &FuturesMetrics) -> Output {
        // TODO: Use `u128` when `AtomicU128` is stable.
        metric.busy.inc_by(
            u64::try_from(self.busy.as_nanos())
                .expect("The task doesn't live longer than `u64`"),
        );
        metric.idle.inc_by(
            u64::try_from(self.idle.as_nanos())
                .expect("The task doesn't live longer than `u64`"),
        );
        self.output
    }
}

/// A guard representing a span which has been entered and is currently
/// executing.
///
/// When the guard is dropped, the span will be exited.
///
/// This is returned by the [`Span::enter`] function.
#[derive(Debug)]
#[must_use = "once a span has been entered, it should be exited"]
struct Entered<'a> {
    span: &'a mut Span,
    busy_instant: Instant,
}

impl<'a> Entered<'a> {
    fn new(span: &'a mut Span) -> Self {
        Self {
            span,
            busy_instant: Instant::now(),
        }
    }
}

impl<'a> Drop for Entered<'a> {
    #[inline(always)]
    fn drop(&mut self) {
        self.span.busy = self.span.busy.saturating_add(self.busy_instant.elapsed());
        self.span.do_exit()
    }
}

/// A handle representing a span, with the capability to enter the span if it
/// exists.
#[derive(Default, Debug, Clone)]
struct Span {
    /// The cumulative busy(active) time of the future across all `poll` calls.
    busy: Duration,
    /// The cumulative idle time of the future across all `poll` calls.
    idle: Duration,
    /// An [`Instant`] to track the idle time.
    ///
    /// If this is `None`, then the span has either closed or was never enabled.
    idle_instant: Option<Instant>,
}

impl Span {
    /// Enters this span, returning a guard that will exit the span when dropped.
    #[inline(always)]
    pub fn enter(&mut self) -> Entered<'_> {
        self.do_enter();
        Entered::new(self)
    }

    #[inline(always)]
    fn do_enter(&mut self) {
        let idle_instant = core::mem::take(&mut self.idle_instant);

        if let Some(idle_instant) = idle_instant {
            self.idle = self.idle.saturating_add(idle_instant.elapsed());
        }
    }

    #[inline(always)]
    fn do_exit(&mut self) {
        self.idle_instant = Some(Instant::now());
    }
}

pin_project_lite::pin_project! {
    /// A [`Future`] that has been tracked with a [`Span`].
    /// It tracks the execution time of the future, it's active and idle phases.
    #[derive(Debug, Clone)]
    #[must_use = "futures do nothing unless you `.await` or poll them"]
    pub struct FutureTracker<T> {
        #[pin]
        inner: T,
        span: Span,
    }
}

impl<T> FutureTracker<T> {
    /// Creates a [`FutureTracker`] wrapper around the `inner` future.
    pub fn new(inner: T) -> Self {
        Self {
            inner,
            span: Default::default(),
        }
    }
}

impl<T: Future> Future for FutureTracker<T> {
    type Output = ExecutionTime<T::Output>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let enter = this.span.enter();
        let output = this.inner.poll(cx);

        match output {
            Poll::Ready(output) => {
                drop(enter);
                Poll::Ready(ExecutionTime {
                    busy: this.span.busy,
                    idle: this.span.idle,
                    output,
                })
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use crate::futures::future_tracker::FutureTracker;

    #[tokio::test]
    async fn empty_future() {
        let future = async { tokio::task::yield_now().await };
        let wrapper_future = FutureTracker::new(future);
        let result = wrapper_future.await;
        assert_eq!(result.idle.as_secs(), 0)
    }

    #[tokio::test]
    async fn idle_time_correct() {
        let future = async { tokio::time::sleep(Duration::from_secs(1)).await };
        let wrapper_future = FutureTracker::new(future);
        let result = wrapper_future.await;
        assert_eq!(result.idle.as_secs(), 1)
    }

    #[tokio::test]
    async fn busy_time_correct() {
        let future = async { std::thread::sleep(Duration::from_secs(1)) };
        let wrapper_future = FutureTracker::new(future);
        let result = wrapper_future.await;
        assert_eq!(result.idle.as_secs(), 0);
        assert_eq!(result.busy.as_secs(), 1);
    }

    #[tokio::test]
    async fn hybrid_time_correct() {
        let future = async {
            tokio::time::sleep(Duration::from_secs(2)).await;
            std::thread::sleep(Duration::from_secs(1));
        };
        let wrapper_future = FutureTracker::new(future);
        let result = wrapper_future.await;
        assert_eq!(result.idle.as_secs(), 2);
        assert_eq!(result.busy.as_secs(), 1);
    }

    #[tokio::test]
    async fn hybrid_time_correct_complex_case() {
        let future = async {
            tokio::time::sleep(Duration::from_secs(1)).await;
            std::thread::sleep(Duration::from_secs(1));
            tokio::time::sleep(Duration::from_secs(2)).await;
            std::thread::sleep(Duration::from_secs(2));
            tokio::time::sleep(Duration::from_secs(3)).await;
        };
        let wrapper_future = FutureTracker::new(future);
        let result = wrapper_future.await;
        assert_eq!(result.idle.as_secs(), 6);
        assert_eq!(result.busy.as_secs(), 3);
    }
}