fuel_core_services/
async_processor.rs1use fuel_core_metrics::futures::{
2 metered_future::MeteredFuture,
3 FuturesMetrics,
4};
5use std::{
6 future::Future,
7 sync::Arc,
8};
9use tokio::{
10 runtime,
11 sync::{
12 OwnedSemaphorePermit,
13 Semaphore,
14 },
15 task::JoinHandle,
16};
17
18pub struct AsyncProcessor {
21 metric: FuturesMetrics,
22 semaphore: Arc<Semaphore>,
23 thread_pool: Option<runtime::Runtime>,
24}
25
26impl Drop for AsyncProcessor {
27 fn drop(&mut self) {
28 if let Some(runtime) = self.thread_pool.take() {
29 runtime.shutdown_background();
30 }
31 }
32}
33
34pub struct AsyncReservation(OwnedSemaphorePermit);
36
37#[derive(Debug, PartialEq, Eq)]
39pub struct OutOfCapacity;
40
41impl AsyncProcessor {
42 pub fn new(
45 metric_name: &str,
46 number_of_threads: usize,
47 number_of_pending_tasks: usize,
48 ) -> anyhow::Result<Self> {
49 let thread_pool = if number_of_threads != 0 {
50 let runtime = runtime::Builder::new_multi_thread()
51 .worker_threads(number_of_threads)
52 .enable_all()
53 .build()
54 .map_err(|e| anyhow::anyhow!("Failed to create a tokio pool: {}", e))?;
55
56 Some(runtime)
57 } else {
58 None
59 };
60 let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks));
61 let metric = FuturesMetrics::obtain_futures_metrics(metric_name);
62 Ok(Self {
63 metric,
64 thread_pool,
65 semaphore,
66 })
67 }
68
69 pub fn reserve(&self) -> Result<AsyncReservation, OutOfCapacity> {
71 let permit = self.semaphore.clone().try_acquire_owned();
72 if let Ok(permit) = permit {
73 Ok(AsyncReservation(permit))
74 } else {
75 Err(OutOfCapacity)
76 }
77 }
78
79 pub fn spawn_reserved<F>(
81 &self,
82 reservation: AsyncReservation,
83 future: F,
84 ) -> JoinHandle<F::Output>
85 where
86 F: Future + Send + 'static,
87 F::Output: Send,
88 {
89 let permit = reservation.0;
90 let future = async move {
91 let permit = permit;
92 let result = future.await;
93 drop(permit);
94 result
95 };
96 let metered_future = MeteredFuture::new(future, self.metric.clone());
97 if let Some(runtime) = &self.thread_pool {
98 runtime.spawn(metered_future)
99 } else {
100 tokio::spawn(metered_future)
101 }
102 }
103
104 pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, OutOfCapacity>
106 where
107 F: Future + Send + 'static,
108 F::Output: Send,
109 {
110 let reservation = self.reserve()?;
111 Ok(self.spawn_reserved(reservation, future))
112 }
113}
114
115#[cfg(test)]
116#[allow(clippy::bool_assert_comparison)]
117#[allow(non_snake_case)]
118mod tests {
119 use super::*;
120 use futures::future::join_all;
121 use std::{
122 collections::HashSet,
123 iter,
124 thread::sleep,
125 time::Duration,
126 };
127 use tokio::time::Instant;
128
129 #[tokio::test]
130 async fn one_spawn_single_tasks_works() {
131 const NUMBER_OF_PENDING_TASKS: usize = 1;
133 let heavy_task_processor =
134 AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
135
136 let (sender, receiver) = tokio::sync::oneshot::channel();
138 let result = heavy_task_processor.try_spawn(async move {
139 sender.send(()).unwrap();
140 });
141
142 result.expect("Expected Ok result");
144 tokio::time::timeout(Duration::from_secs(5), receiver)
145 .await
146 .unwrap()
147 .unwrap();
148 }
149
150 #[tokio::test]
151 async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
152 const MAX_NUMBER_OF_THREADS: usize = 10;
154 const NUMBER_OF_PENDING_TASKS: usize = 10000;
155 let heavy_task_processor =
156 AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
157 .unwrap();
158 let main_handler = tokio::spawn(async move { std::thread::current().id() });
159 let main_id = main_handler.await.unwrap();
160
161 let futures = iter::repeat_with(|| {
163 heavy_task_processor
164 .try_spawn(async move { std::thread::current().id() })
165 .unwrap()
166 })
167 .take(NUMBER_OF_PENDING_TASKS)
168 .collect::<Vec<_>>();
169
170 let thread_ids = join_all(futures).await;
172 let unique_thread_ids = thread_ids
173 .into_iter()
174 .map(|r| r.unwrap())
175 .collect::<HashSet<_>>();
176
177 assert!(!unique_thread_ids.contains(&main_id));
179 assert!(!unique_thread_ids.is_empty());
181 assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS);
183 }
184
185 #[test]
186 fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
187 const NUMBER_OF_PENDING_TASKS: usize = 1;
189 let heavy_task_processor =
190 AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
191 let first_spawn_result = heavy_task_processor.try_spawn(async move {
192 sleep(Duration::from_secs(1));
193 });
194 first_spawn_result.expect("Expected Ok result");
195
196 let second_spawn_result = heavy_task_processor.try_spawn(async move {
198 sleep(Duration::from_secs(1));
199 });
200
201 let err = second_spawn_result.expect_err("Should error");
203 assert_eq!(err, OutOfCapacity);
204 }
205
206 #[test]
207 fn second_spawn_works_when_first_is_finished() {
208 const NUMBER_OF_PENDING_TASKS: usize = 1;
209 let heavy_task_processor =
210 AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
211
212 let (sender, receiver) = tokio::sync::oneshot::channel();
214 let first_spawn = heavy_task_processor.try_spawn(async move {
215 sleep(Duration::from_secs(1));
216 sender.send(()).unwrap();
217 });
218 first_spawn.expect("Expected Ok result");
219 futures::executor::block_on(async move {
220 receiver.await.unwrap();
221 });
222
223 let second_spawn = heavy_task_processor.try_spawn(async move {
225 sleep(Duration::from_secs(1));
226 });
227
228 second_spawn.expect("Expected Ok result");
230 }
231
232 #[test]
233 fn can_spawn_10_tasks_when_limit_is_10() {
234 const NUMBER_OF_PENDING_TASKS: usize = 10;
236 let heavy_task_processor =
237 AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
238
239 for _ in 0..NUMBER_OF_PENDING_TASKS {
240 let result = heavy_task_processor.try_spawn(async move {
242 tokio::time::sleep(Duration::from_secs(1)).await;
243 });
244
245 result.expect("Expected Ok result");
247 }
248 }
249
250 #[tokio::test]
251 async fn executes_5_tasks_for_5_seconds_with_one_thread() {
252 const NUMBER_OF_PENDING_TASKS: usize = 5;
254 const NUMBER_OF_THREADS: usize = 1;
255 let heavy_task_processor =
256 AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
257 .unwrap();
258
259 let (broadcast_sender, mut broadcast_receiver) =
261 tokio::sync::broadcast::channel(1024);
262 let instant = Instant::now();
263 for _ in 0..NUMBER_OF_PENDING_TASKS {
264 let broadcast_sender = broadcast_sender.clone();
265 let result = heavy_task_processor.try_spawn(async move {
266 sleep(Duration::from_secs(1));
267 broadcast_sender.send(()).unwrap();
268 });
269 result.expect("Expected Ok result");
270 }
271 drop(broadcast_sender);
272
273 while broadcast_receiver.recv().await.is_ok() {}
275 const LEEWAY: Duration = Duration::from_millis(300);
279 assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY);
280 assert!(instant.elapsed() >= Duration::from_secs(5));
282 tokio::time::sleep(Duration::from_secs(1)).await;
284 let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
285 assert_eq!(duration.as_secs(), 5);
286 let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
287 assert_eq!(duration.as_secs(), 0);
288 }
289
290 #[tokio::test]
291 async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time()
292 {
293 const NUMBER_OF_PENDING_TASKS: usize = 10;
295 const NUMBER_OF_THREADS: usize = 10;
296 let heavy_task_processor =
297 AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
298 .unwrap();
299
300 let (broadcast_sender, mut broadcast_receiver) =
302 tokio::sync::broadcast::channel(1024);
303 let instant = Instant::now();
304 for _ in 0..NUMBER_OF_PENDING_TASKS {
305 let broadcast_sender = broadcast_sender.clone();
306 let result = heavy_task_processor.try_spawn(async move {
307 sleep(Duration::from_secs(1));
308 broadcast_sender.send(()).unwrap();
309 });
310 result.expect("Expected Ok result");
311 }
312 drop(broadcast_sender);
313
314 while broadcast_receiver.recv().await.is_ok() {}
316 const LEEWAY: Duration = Duration::from_millis(300);
320 assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
321 tokio::time::sleep(Duration::from_secs(1)).await;
323 let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
324 assert_eq!(duration.as_secs(), 10);
325 let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
326 assert_eq!(duration.as_secs(), 0);
327 }
328
329 #[tokio::test]
330 async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time(
331 ) {
332 const NUMBER_OF_PENDING_TASKS: usize = 10;
334 const NUMBER_OF_THREADS: usize = 10;
335 let heavy_task_processor =
336 AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
337 .unwrap();
338
339 let (broadcast_sender, mut broadcast_receiver) =
341 tokio::sync::broadcast::channel(1024);
342 let instant = Instant::now();
343 for _ in 0..NUMBER_OF_PENDING_TASKS {
344 let broadcast_sender = broadcast_sender.clone();
345 let result = heavy_task_processor.try_spawn(async move {
346 tokio::time::sleep(Duration::from_secs(1)).await;
347 broadcast_sender.send(()).unwrap();
348 });
349 result.expect("Expected Ok result");
350 }
351 drop(broadcast_sender);
352
353 while broadcast_receiver.recv().await.is_ok() {}
355 const LEEWAY: Duration = Duration::from_millis(300);
359 assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
360 tokio::time::sleep(Duration::from_secs(1)).await;
362 let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
363 assert_eq!(duration.as_secs(), 0);
364 let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
365 assert_eq!(duration.as_secs(), 10);
366 }
367}