fuel_core_metrics/futures/
future_tracker.rs1use crate::futures::FuturesMetrics;
2use core::{
3 future::Future,
4 pin::Pin,
5 task::{
6 Context,
7 Poll,
8 },
9 time::Duration,
10};
11use std::time::Instant;
12
13#[derive(Debug)]
15pub struct ExecutionTime<Output> {
16 busy: Duration,
18 idle: Duration,
20 output: Output,
22}
23
24impl<Output> ExecutionTime<Output> {
25 pub fn extract(self, metric: &FuturesMetrics) -> Output {
27 metric.busy.inc_by(
29 u64::try_from(self.busy.as_nanos())
30 .expect("The task doesn't live longer than `u64`"),
31 );
32 metric.idle.inc_by(
33 u64::try_from(self.idle.as_nanos())
34 .expect("The task doesn't live longer than `u64`"),
35 );
36 self.output
37 }
38}
39
40#[derive(Debug)]
47#[must_use = "once a span has been entered, it should be exited"]
48struct Entered<'a> {
49 span: &'a mut Span,
50 busy_instant: Instant,
51}
52
53impl<'a> Entered<'a> {
54 fn new(span: &'a mut Span) -> Self {
55 Self {
56 span,
57 busy_instant: Instant::now(),
58 }
59 }
60}
61
62impl<'a> Drop for Entered<'a> {
63 #[inline(always)]
64 fn drop(&mut self) {
65 self.span.busy = self.span.busy.saturating_add(self.busy_instant.elapsed());
66 self.span.do_exit()
67 }
68}
69
70#[derive(Default, Debug, Clone)]
73struct Span {
74 busy: Duration,
76 idle: Duration,
78 idle_instant: Option<Instant>,
82}
83
84impl Span {
85 #[inline(always)]
87 pub fn enter(&mut self) -> Entered<'_> {
88 self.do_enter();
89 Entered::new(self)
90 }
91
92 #[inline(always)]
93 fn do_enter(&mut self) {
94 let idle_instant = core::mem::take(&mut self.idle_instant);
95
96 if let Some(idle_instant) = idle_instant {
97 self.idle = self.idle.saturating_add(idle_instant.elapsed());
98 }
99 }
100
101 #[inline(always)]
102 fn do_exit(&mut self) {
103 self.idle_instant = Some(Instant::now());
104 }
105}
106
107pin_project_lite::pin_project! {
108 #[derive(Debug, Clone)]
111 #[must_use = "futures do nothing unless you `.await` or poll them"]
112 pub struct FutureTracker<T> {
113 #[pin]
114 inner: T,
115 span: Span,
116 }
117}
118
119impl<T> FutureTracker<T> {
120 pub fn new(inner: T) -> Self {
122 Self {
123 inner,
124 span: Default::default(),
125 }
126 }
127}
128
129impl<T: Future> Future for FutureTracker<T> {
130 type Output = ExecutionTime<T::Output>;
131
132 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133 let this = self.project();
134 let enter = this.span.enter();
135 let output = this.inner.poll(cx);
136
137 match output {
138 Poll::Ready(output) => {
139 drop(enter);
140 Poll::Ready(ExecutionTime {
141 busy: this.span.busy,
142 idle: this.span.idle,
143 output,
144 })
145 }
146 Poll::Pending => Poll::Pending,
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use std::time::Duration;
154
155 use crate::futures::future_tracker::FutureTracker;
156
157 #[tokio::test]
158 async fn empty_future() {
159 let future = async { tokio::task::yield_now().await };
160 let wrapper_future = FutureTracker::new(future);
161 let result = wrapper_future.await;
162 assert_eq!(result.idle.as_secs(), 0)
163 }
164
165 #[tokio::test]
166 async fn idle_time_correct() {
167 let future = async { tokio::time::sleep(Duration::from_secs(1)).await };
168 let wrapper_future = FutureTracker::new(future);
169 let result = wrapper_future.await;
170 assert_eq!(result.idle.as_secs(), 1)
171 }
172
173 #[tokio::test]
174 async fn busy_time_correct() {
175 let future = async { std::thread::sleep(Duration::from_secs(1)) };
176 let wrapper_future = FutureTracker::new(future);
177 let result = wrapper_future.await;
178 assert_eq!(result.idle.as_secs(), 0);
179 assert_eq!(result.busy.as_secs(), 1);
180 }
181
182 #[tokio::test]
183 async fn hybrid_time_correct() {
184 let future = async {
185 tokio::time::sleep(Duration::from_secs(2)).await;
186 std::thread::sleep(Duration::from_secs(1));
187 };
188 let wrapper_future = FutureTracker::new(future);
189 let result = wrapper_future.await;
190 assert_eq!(result.idle.as_secs(), 2);
191 assert_eq!(result.busy.as_secs(), 1);
192 }
193
194 #[tokio::test]
195 async fn hybrid_time_correct_complex_case() {
196 let future = async {
197 tokio::time::sleep(Duration::from_secs(1)).await;
198 std::thread::sleep(Duration::from_secs(1));
199 tokio::time::sleep(Duration::from_secs(2)).await;
200 std::thread::sleep(Duration::from_secs(2));
201 tokio::time::sleep(Duration::from_secs(3)).await;
202 };
203 let wrapper_future = FutureTracker::new(future);
204 let result = wrapper_future.await;
205 assert_eq!(result.idle.as_secs(), 6);
206 assert_eq!(result.busy.as_secs(), 3);
207 }
208}