1use std::{future::Future, marker::PhantomData, sync::RwLock};
2
3use once_cell::sync::Lazy;
4use tokio::runtime::Runtime;
5
6use crate::{sys, JsDeferred, JsUnknown, NapiValue, Result};
7
8fn create_runtime() -> Option<Runtime> {
9 #[cfg(not(target_family = "wasm"))]
10 {
11 let runtime = tokio::runtime::Runtime::new().expect("Create tokio runtime failed");
12 Some(runtime)
13 }
14
15 #[cfg(target_family = "wasm")]
16 {
17 tokio::runtime::Builder::new_current_thread()
18 .enable_all()
19 .build()
20 .ok()
21 }
22}
23
24pub(crate) static RT: Lazy<RwLock<Option<Runtime>>> = Lazy::new(|| RwLock::new(create_runtime()));
25
26#[cfg(not(any(target_os = "macos", target_family = "wasm")))]
27static RT_REFERENCE_COUNT: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
28
29#[cfg(not(any(target_os = "macos", target_family = "wasm")))]
34pub(crate) fn ensure_runtime() {
35 use std::sync::atomic::Ordering;
36
37 let mut rt = RT.write().unwrap();
38 if rt.is_none() {
39 *rt = create_runtime();
40 }
41
42 RT_REFERENCE_COUNT.fetch_add(1, Ordering::Relaxed);
43}
44
45#[cfg(not(any(target_os = "macos", target_family = "wasm")))]
46pub(crate) unsafe extern "C" fn drop_runtime(_arg: *mut std::ffi::c_void) {
47 use std::sync::atomic::Ordering;
48
49 if RT_REFERENCE_COUNT.fetch_sub(1, Ordering::AcqRel) == 1 {
50 RT.write().unwrap().take();
51 }
52}
53
54pub fn spawn<F>(fut: F) -> tokio::task::JoinHandle<F::Output>
59where
60 F: 'static + Send + Future<Output = ()>,
61{
62 RT.read().unwrap().as_ref().unwrap().spawn(fut)
63}
64
65pub fn block_on<F: Future>(fut: F) -> F::Output {
69 RT.read().unwrap().as_ref().unwrap().block_on(fut)
70}
71
72pub fn spawn_blocking<F, R>(func: F) -> tokio::task::JoinHandle<R>
74where
75 F: FnOnce() -> R + Send + 'static,
76 R: Send + 'static,
77{
78 RT.read().unwrap().as_ref().unwrap().spawn_blocking(func)
79}
80
81#[inline]
87pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
88 let _rt_guard = RT.read().unwrap().as_ref().unwrap().enter();
89 f()
90}
91
92struct SendableResolver<
93 Data: 'static + Send,
94 R: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
95> {
96 inner: R,
97 _data: PhantomData<Data>,
98}
99
100unsafe impl<Data: 'static + Send, R: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>>
105 Send for SendableResolver<Data, R>
106{
107}
108
109impl<Data: 'static + Send, R: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>>
110 SendableResolver<Data, R>
111{
112 fn new(inner: R) -> Self {
113 Self {
114 inner,
115 _data: PhantomData,
116 }
117 }
118
119 fn resolve(self, env: sys::napi_env, data: Data) -> Result<sys::napi_value> {
120 (self.inner)(env, data)
121 }
122}
123
124#[allow(clippy::not_unsafe_ptr_arg_deref)]
125pub fn execute_tokio_future<
126 Data: 'static + Send,
127 Fut: 'static + Send + Future<Output = Result<Data>>,
128 Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
129>(
130 env: sys::napi_env,
131 fut: Fut,
132 resolver: Resolver,
133) -> Result<sys::napi_value> {
134 let (deferred, promise) = JsDeferred::new(env)?;
135
136 let sendable_resolver = SendableResolver::new(resolver);
137
138 let inner = async {
139 match fut.await {
140 Ok(v) => deferred.resolve(move |env| {
141 sendable_resolver
142 .resolve(env.raw(), v)
143 .map(|v| unsafe { JsUnknown::from_raw_unchecked(env.raw(), v) })
144 }),
145 Err(e) => deferred.reject(e),
146 }
147 };
148
149 #[cfg(not(target_family = "wasm"))]
150 spawn(inner);
151
152 #[cfg(target_family = "wasm")]
153 {
154 std::thread::spawn(|| {
155 block_on(inner);
156 });
157 }
158
159 Ok(promise.0.value)
160}