fuel_core_services/
async_processor.rs

1use fuel_core_metrics::futures::{
2    metered_future::MeteredFuture,
3    FuturesMetrics,
4};
5use std::{
6    future::Future,
7    sync::Arc,
8};
9use tokio::{
10    runtime,
11    sync::{
12        OwnedSemaphorePermit,
13        Semaphore,
14    },
15    task::JoinHandle,
16};
17
18/// A processor that can execute async tasks with a limit on the number of tasks that can be
19/// executed concurrently.
20pub struct AsyncProcessor {
21    metric: FuturesMetrics,
22    semaphore: Arc<Semaphore>,
23    thread_pool: Option<runtime::Runtime>,
24}
25
26impl Drop for AsyncProcessor {
27    fn drop(&mut self) {
28        if let Some(runtime) = self.thread_pool.take() {
29            runtime.shutdown_background();
30        }
31    }
32}
33
34/// A reservation for a task to be executed by the `AsyncProcessor`.
35pub struct AsyncReservation(OwnedSemaphorePermit);
36
37/// Out of capacity error.
38#[derive(Debug, PartialEq, Eq)]
39pub struct OutOfCapacity;
40
41impl AsyncProcessor {
42    /// Create a new `AsyncProcessor` with the given number of threads and the number of pending
43    /// tasks.
44    pub fn new(
45        metric_name: &str,
46        number_of_threads: usize,
47        number_of_pending_tasks: usize,
48    ) -> anyhow::Result<Self> {
49        let thread_pool = if number_of_threads != 0 {
50            let runtime = runtime::Builder::new_multi_thread()
51                .worker_threads(number_of_threads)
52                .enable_all()
53                .build()
54                .map_err(|e| anyhow::anyhow!("Failed to create a tokio pool: {}", e))?;
55
56            Some(runtime)
57        } else {
58            None
59        };
60        let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks));
61        let metric = FuturesMetrics::obtain_futures_metrics(metric_name);
62        Ok(Self {
63            metric,
64            thread_pool,
65            semaphore,
66        })
67    }
68
69    /// Reserve a slot for a task to be executed.
70    pub fn reserve(&self) -> Result<AsyncReservation, OutOfCapacity> {
71        let permit = self.semaphore.clone().try_acquire_owned();
72        if let Ok(permit) = permit {
73            Ok(AsyncReservation(permit))
74        } else {
75            Err(OutOfCapacity)
76        }
77    }
78
79    /// Spawn a task with a reservation.
80    pub fn spawn_reserved<F>(
81        &self,
82        reservation: AsyncReservation,
83        future: F,
84    ) -> JoinHandle<F::Output>
85    where
86        F: Future + Send + 'static,
87        F::Output: Send,
88    {
89        let permit = reservation.0;
90        let future = async move {
91            let permit = permit;
92            let result = future.await;
93            drop(permit);
94            result
95        };
96        let metered_future = MeteredFuture::new(future, self.metric.clone());
97        if let Some(runtime) = &self.thread_pool {
98            runtime.spawn(metered_future)
99        } else {
100            tokio::spawn(metered_future)
101        }
102    }
103
104    /// Tries to spawn a task. If the task cannot be spawned, returns an error.
105    pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, OutOfCapacity>
106    where
107        F: Future + Send + 'static,
108        F::Output: Send,
109    {
110        let reservation = self.reserve()?;
111        Ok(self.spawn_reserved(reservation, future))
112    }
113}
114
115#[cfg(test)]
116#[allow(clippy::bool_assert_comparison)]
117#[allow(non_snake_case)]
118mod tests {
119    use super::*;
120    use futures::future::join_all;
121    use std::{
122        collections::HashSet,
123        iter,
124        thread::sleep,
125        time::Duration,
126    };
127    use tokio::time::Instant;
128
129    #[tokio::test]
130    async fn one_spawn_single_tasks_works() {
131        // Given
132        const NUMBER_OF_PENDING_TASKS: usize = 1;
133        let heavy_task_processor =
134            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
135
136        // When
137        let (sender, receiver) = tokio::sync::oneshot::channel();
138        let result = heavy_task_processor.try_spawn(async move {
139            sender.send(()).unwrap();
140        });
141
142        // Then
143        result.expect("Expected Ok result");
144        tokio::time::timeout(Duration::from_secs(5), receiver)
145            .await
146            .unwrap()
147            .unwrap();
148    }
149
150    #[tokio::test]
151    async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
152        // Given
153        const MAX_NUMBER_OF_THREADS: usize = 10;
154        const NUMBER_OF_PENDING_TASKS: usize = 10000;
155        let heavy_task_processor =
156            AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
157                .unwrap();
158        let main_handler = tokio::spawn(async move { std::thread::current().id() });
159        let main_id = main_handler.await.unwrap();
160
161        // When
162        let futures = iter::repeat_with(|| {
163            heavy_task_processor
164                .try_spawn(async move { std::thread::current().id() })
165                .unwrap()
166        })
167        .take(NUMBER_OF_PENDING_TASKS)
168        .collect::<Vec<_>>();
169
170        // Then
171        let thread_ids = join_all(futures).await;
172        let unique_thread_ids = thread_ids
173            .into_iter()
174            .map(|r| r.unwrap())
175            .collect::<HashSet<_>>();
176
177        // Main thread was not used.
178        assert!(!unique_thread_ids.contains(&main_id));
179        // There's been at least one worker thread used.
180        assert!(!unique_thread_ids.is_empty());
181        // There were no more worker threads above the threshold.
182        assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS);
183    }
184
185    #[test]
186    fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
187        // Given
188        const NUMBER_OF_PENDING_TASKS: usize = 1;
189        let heavy_task_processor =
190            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
191        let first_spawn_result = heavy_task_processor.try_spawn(async move {
192            sleep(Duration::from_secs(1));
193        });
194        first_spawn_result.expect("Expected Ok result");
195
196        // When
197        let second_spawn_result = heavy_task_processor.try_spawn(async move {
198            sleep(Duration::from_secs(1));
199        });
200
201        // Then
202        let err = second_spawn_result.expect_err("Should error");
203        assert_eq!(err, OutOfCapacity);
204    }
205
206    #[test]
207    fn second_spawn_works_when_first_is_finished() {
208        const NUMBER_OF_PENDING_TASKS: usize = 1;
209        let heavy_task_processor =
210            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
211
212        // Given
213        let (sender, receiver) = tokio::sync::oneshot::channel();
214        let first_spawn = heavy_task_processor.try_spawn(async move {
215            sleep(Duration::from_secs(1));
216            sender.send(()).unwrap();
217        });
218        first_spawn.expect("Expected Ok result");
219        futures::executor::block_on(async move {
220            receiver.await.unwrap();
221        });
222
223        // When
224        let second_spawn = heavy_task_processor.try_spawn(async move {
225            sleep(Duration::from_secs(1));
226        });
227
228        // Then
229        second_spawn.expect("Expected Ok result");
230    }
231
232    #[test]
233    fn can_spawn_10_tasks_when_limit_is_10() {
234        // Given
235        const NUMBER_OF_PENDING_TASKS: usize = 10;
236        let heavy_task_processor =
237            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
238
239        for _ in 0..NUMBER_OF_PENDING_TASKS {
240            // When
241            let result = heavy_task_processor.try_spawn(async move {
242                tokio::time::sleep(Duration::from_secs(1)).await;
243            });
244
245            // Then
246            result.expect("Expected Ok result");
247        }
248    }
249
250    #[tokio::test]
251    async fn executes_5_tasks_for_5_seconds_with_one_thread() {
252        // Given
253        const NUMBER_OF_PENDING_TASKS: usize = 5;
254        const NUMBER_OF_THREADS: usize = 1;
255        let heavy_task_processor =
256            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
257                .unwrap();
258
259        // When
260        let (broadcast_sender, mut broadcast_receiver) =
261            tokio::sync::broadcast::channel(1024);
262        let instant = Instant::now();
263        for _ in 0..NUMBER_OF_PENDING_TASKS {
264            let broadcast_sender = broadcast_sender.clone();
265            let result = heavy_task_processor.try_spawn(async move {
266                sleep(Duration::from_secs(1));
267                broadcast_sender.send(()).unwrap();
268            });
269            result.expect("Expected Ok result");
270        }
271        drop(broadcast_sender);
272
273        // Then
274        while broadcast_receiver.recv().await.is_ok() {}
275        // 5 tasks running on 1 thread, each task taking 1 second,
276        // should complete in approximately 5 seconds overall.
277        // Allowing some LEEWAY to account for runtime overhead.
278        const LEEWAY: Duration = Duration::from_millis(300);
279        assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY);
280        // Make sure that the tasks were not executed in parallel.
281        assert!(instant.elapsed() >= Duration::from_secs(5));
282        // Wait for the metrics to be updated.
283        tokio::time::sleep(Duration::from_secs(1)).await;
284        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
285        assert_eq!(duration.as_secs(), 5);
286        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
287        assert_eq!(duration.as_secs(), 0);
288    }
289
290    #[tokio::test]
291    async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time()
292    {
293        // Given
294        const NUMBER_OF_PENDING_TASKS: usize = 10;
295        const NUMBER_OF_THREADS: usize = 10;
296        let heavy_task_processor =
297            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
298                .unwrap();
299
300        // When
301        let (broadcast_sender, mut broadcast_receiver) =
302            tokio::sync::broadcast::channel(1024);
303        let instant = Instant::now();
304        for _ in 0..NUMBER_OF_PENDING_TASKS {
305            let broadcast_sender = broadcast_sender.clone();
306            let result = heavy_task_processor.try_spawn(async move {
307                sleep(Duration::from_secs(1));
308                broadcast_sender.send(()).unwrap();
309            });
310            result.expect("Expected Ok result");
311        }
312        drop(broadcast_sender);
313
314        // Then
315        while broadcast_receiver.recv().await.is_ok() {}
316        // 10 blocking tasks running on 10 threads, each task taking 1 second,
317        // should complete in approximately 1 second overall.
318        // Allowing some LEEWAY to account for runtime overhead.
319        const LEEWAY: Duration = Duration::from_millis(300);
320        assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
321        // Wait for the metrics to be updated.
322        tokio::time::sleep(Duration::from_secs(1)).await;
323        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
324        assert_eq!(duration.as_secs(), 10);
325        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
326        assert_eq!(duration.as_secs(), 0);
327    }
328
329    #[tokio::test]
330    async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time(
331    ) {
332        // Given
333        const NUMBER_OF_PENDING_TASKS: usize = 10;
334        const NUMBER_OF_THREADS: usize = 10;
335        let heavy_task_processor =
336            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
337                .unwrap();
338
339        // When
340        let (broadcast_sender, mut broadcast_receiver) =
341            tokio::sync::broadcast::channel(1024);
342        let instant = Instant::now();
343        for _ in 0..NUMBER_OF_PENDING_TASKS {
344            let broadcast_sender = broadcast_sender.clone();
345            let result = heavy_task_processor.try_spawn(async move {
346                tokio::time::sleep(Duration::from_secs(1)).await;
347                broadcast_sender.send(()).unwrap();
348            });
349            result.expect("Expected Ok result");
350        }
351        drop(broadcast_sender);
352
353        // Then
354        while broadcast_receiver.recv().await.is_ok() {}
355        // 10 non-blocking tasks running on 10 threads, each task taking 1 second,
356        // should complete in approximately 1 second overall.
357        // Allowing some LEEWAY to account for runtime overhead.
358        const LEEWAY: Duration = Duration::from_millis(300);
359        assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
360        // Wait for the metrics to be updated.
361        tokio::time::sleep(Duration::from_secs(1)).await;
362        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
363        assert_eq!(duration.as_secs(), 0);
364        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
365        assert_eq!(duration.as_secs(), 10);
366    }
367}