tokio_rayon/
global.rs

1use crate::AsyncRayonHandle;
2use std::panic::{catch_unwind, AssertUnwindSafe};
3use tokio::sync::oneshot;
4
5/// Asynchronous wrapper around Rayon's [`spawn`](rayon::spawn).
6///
7/// Runs a function on the global Rayon thread pool with LIFO priority,
8/// produciing a future that resolves with the function's return value.
9///
10/// # Panics
11/// If the task function panics, the panic will be propagated through the
12/// returned future. This will NOT trigger the Rayon thread pool's panic
13/// handler.
14///
15/// If the returned handle is dropped, and the return value of `func` panics
16/// when dropped, that panic WILL trigger the thread pool's panic
17/// handler.
18pub fn spawn<F, R>(func: F) -> AsyncRayonHandle<R>
19where
20    F: FnOnce() -> R + Send + 'static,
21    R: Send + 'static,
22{
23    let (tx, rx) = oneshot::channel();
24
25    rayon::spawn(move || {
26        let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
27    });
28
29    AsyncRayonHandle { rx }
30}
31
32/// Asynchronous wrapper around Rayon's [`spawn_fifo`](rayon::spawn_fifo).
33///
34/// Runs a function on the global Rayon thread pool with FIFO priority,
35/// produciing a future that resolves with the function's return value.
36///
37/// # Panics
38/// If the task function panics, the panic will be propagated through the
39/// returned future. This will NOT trigger the Rayon thread pool's panic
40/// handler.
41///
42/// If the returned handle is dropped, and the return value of `func` panics
43/// when dropped, then that panic WILL trigger the thread pool's panic
44/// handler.
45pub fn spawn_fifo<F, R>(func: F) -> AsyncRayonHandle<R>
46where
47    F: FnOnce() -> R + Send + 'static,
48    R: Send + 'static,
49{
50    let (tx, rx) = oneshot::channel();
51
52    rayon::spawn_fifo(move || {
53        let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
54    });
55
56    AsyncRayonHandle { rx }
57}
58
59#[cfg(test)]
60mod tests {
61    use super::*;
62    use crate::test::init;
63
64    #[tokio::test]
65    async fn test_spawn_async_works() {
66        init();
67        let result = spawn(|| {
68            let thread_index = rayon::current_thread_index();
69            assert_eq!(thread_index, Some(0));
70            1337_usize
71        })
72        .await;
73        assert_eq!(result, 1337);
74        let thread_index = rayon::current_thread_index();
75        assert_eq!(thread_index, None);
76    }
77
78    #[tokio::test]
79    async fn test_spawn_fifo_async_works() {
80        init();
81        let result = spawn_fifo(|| {
82            let thread_index = rayon::current_thread_index();
83            assert_eq!(thread_index, Some(0));
84            1337_usize
85        })
86        .await;
87        assert_eq!(result, 1337);
88        let thread_index = rayon::current_thread_index();
89        assert_eq!(thread_index, None);
90    }
91
92    #[tokio::test]
93    #[should_panic(expected = "Task failed successfully")]
94    async fn test_spawn_propagates_panic() {
95        init();
96        let handle = spawn(|| {
97            panic!("Task failed successfully");
98        });
99
100        handle.await;
101    }
102
103    #[tokio::test]
104    #[should_panic(expected = "Task failed successfully")]
105    async fn test_spawn_fifo_propagates_panic() {
106        init();
107        let handle = spawn_fifo(|| {
108            panic!("Task failed successfully");
109        });
110
111        handle.await;
112    }
113}