1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use std::{future::Future, marker::PhantomData, sync::RwLock};

use once_cell::sync::Lazy;
use tokio::runtime::Runtime;

use crate::{sys, JsDeferred, JsUnknown, NapiValue, Result};

fn create_runtime() -> Option<Runtime> {
  #[cfg(not(target_family = "wasm"))]
  {
    let runtime = tokio::runtime::Runtime::new().expect("Create tokio runtime failed");
    Some(runtime)
  }

  #[cfg(target_family = "wasm")]
  {
    tokio::runtime::Builder::new_current_thread()
      .enable_all()
      .build()
      .ok()
  }
}

pub(crate) static RT: Lazy<RwLock<Option<Runtime>>> = Lazy::new(|| RwLock::new(create_runtime()));

#[cfg(not(any(target_os = "macos", target_family = "wasm")))]
static RT_REFERENCE_COUNT: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);

/// Ensure that the Tokio runtime is initialized.
/// In windows the Tokio runtime will be dropped when Node env exits.
/// But in Electron renderer process, the Node env will exits and recreate when the window reloads.
/// So we need to ensure that the Tokio runtime is initialized when the Node env is created.
#[cfg(not(any(target_os = "macos", target_family = "wasm")))]
pub(crate) fn ensure_runtime() {
  use std::sync::atomic::Ordering;

  let mut rt = RT.write().unwrap();
  if rt.is_none() {
    *rt = create_runtime();
  }

  RT_REFERENCE_COUNT.fetch_add(1, Ordering::Relaxed);
}

#[cfg(not(any(target_os = "macos", target_family = "wasm")))]
pub(crate) unsafe extern "C" fn drop_runtime(_arg: *mut std::ffi::c_void) {
  use std::sync::atomic::Ordering;

  if RT_REFERENCE_COUNT.fetch_sub(1, Ordering::AcqRel) == 1 {
    RT.write().unwrap().take();
  }
}

/// Spawns a future onto the Tokio runtime.
///
/// Depending on where you use it, you should await or abort the future in your drop function.
/// To avoid undefined behavior and memory corruptions.
pub fn spawn<F>(fut: F) -> tokio::task::JoinHandle<F::Output>
where
  F: 'static + Send + Future<Output = ()>,
{
  RT.read().unwrap().as_ref().unwrap().spawn(fut)
}

/// Runs a future to completion
/// This is blocking, meaning that it pauses other execution until the future is complete,
/// only use it when it is absolutely necessary, in other places use async functions instead.
pub fn block_on<F: Future>(fut: F) -> F::Output {
  RT.read().unwrap().as_ref().unwrap().block_on(fut)
}

/// spawn_blocking on the current Tokio runtime.
pub fn spawn_blocking<F, R>(func: F) -> tokio::task::JoinHandle<R>
where
  F: FnOnce() -> R + Send + 'static,
  R: Send + 'static,
{
  RT.read().unwrap().as_ref().unwrap().spawn_blocking(func)
}

// This function's signature must be kept in sync with the one in lib.rs, otherwise napi
// will fail to compile with the `tokio_rt` feature.

/// If the feature `tokio_rt` has been enabled this will enter the runtime context and
/// then call the provided closure. Otherwise it will just call the provided closure.
#[inline]
pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
  let _rt_guard = RT.read().unwrap().as_ref().unwrap().enter();
  f()
}

struct SendableResolver<
  Data: 'static + Send,
  R: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
> {
  inner: R,
  _data: PhantomData<Data>,
}

// the `SendableResolver` will be only called in the `threadsafe_function_call_js` callback
// which means it will be always called in the Node.js JavaScript thread
// so the inner function is not required to be `Send`
// but the `Send` bound is required by the `execute_tokio_future` function
unsafe impl<Data: 'static + Send, R: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>>
  Send for SendableResolver<Data, R>
{
}

impl<Data: 'static + Send, R: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>>
  SendableResolver<Data, R>
{
  fn new(inner: R) -> Self {
    Self {
      inner,
      _data: PhantomData,
    }
  }

  fn resolve(self, env: sys::napi_env, data: Data) -> Result<sys::napi_value> {
    (self.inner)(env, data)
  }
}

#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn execute_tokio_future<
  Data: 'static + Send,
  Fut: 'static + Send + Future<Output = Result<Data>>,
  Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
>(
  env: sys::napi_env,
  fut: Fut,
  resolver: Resolver,
) -> Result<sys::napi_value> {
  let (deferred, promise) = JsDeferred::new(env)?;

  let sendable_resolver = SendableResolver::new(resolver);

  let inner = async {
    match fut.await {
      Ok(v) => deferred.resolve(move |env| {
        sendable_resolver
          .resolve(env.raw(), v)
          .map(|v| unsafe { JsUnknown::from_raw_unchecked(env.raw(), v) })
      }),
      Err(e) => deferred.reject(e),
    }
  };

  #[cfg(not(target_family = "wasm"))]
  spawn(inner);

  #[cfg(target_family = "wasm")]
  {
    std::thread::spawn(|| {
      block_on(inner);
    });
  }

  Ok(promise.0.value)
}