napi_h/
threadsafe_function.rs

1#![allow(clippy::single_component_path_imports)]
2
3use std::convert::Into;
4use std::ffi::CString;
5use std::marker::PhantomData;
6use std::os::raw::c_void;
7use std::ptr::{self, null_mut};
8use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
9use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak};
10
11use crate::bindgen_runtime::{
12  FromNapiValue, JsValuesTupleIntoVec, ToNapiValue, TypeName, ValidateNapiValue,
13};
14use crate::{check_status, sys, Env, JsError, JsUnknown, Result, Status};
15
16/// ThreadSafeFunction Context object
17/// the `value` is the value passed to `call` method
18pub struct ThreadSafeCallContext<T: 'static> {
19  pub env: Env,
20  pub value: T,
21}
22
23#[repr(u8)]
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum ThreadsafeFunctionCallMode {
26  NonBlocking,
27  Blocking,
28}
29
30impl From<ThreadsafeFunctionCallMode> for sys::napi_threadsafe_function_call_mode {
31  fn from(value: ThreadsafeFunctionCallMode) -> Self {
32    match value {
33      ThreadsafeFunctionCallMode::Blocking => sys::ThreadsafeFunctionCallMode::blocking,
34      ThreadsafeFunctionCallMode::NonBlocking => sys::ThreadsafeFunctionCallMode::nonblocking,
35    }
36  }
37}
38
39type_level_enum! {
40  /// Type-level `enum` to express how to feed [`ThreadsafeFunction`] errors to
41  /// the inner [`JsFunction`].
42  ///
43  /// ### Context
44  ///
45  /// For callbacks that expect a `Result`-like kind of input, the convention is
46  /// to have the callback take an `error` parameter as its first parameter.
47  ///
48  /// This way receiving a `Result<Args…>` can be modelled as follows:
49  ///
50  ///   - In case of `Err(error)`, feed that `error` entity as the first parameter
51  ///     of the callback;
52  ///
53  ///   - Otherwise (in case of `Ok(_)`), feed `null` instead.
54  ///
55  /// In pseudo-code:
56  ///
57  /// ```rust,ignore
58  /// match result_args {
59  ///     Ok(args) => {
60  ///         let js_null = /* … */;
61  ///         callback.call(
62  ///             // this
63  ///             None,
64  ///             // args…
65  ///             &iter::once(js_null).chain(args).collect::<Vec<_>>(),
66  ///         )
67  ///     },
68  ///     Err(err) => callback.call(None, &[JsError::from(err)]),
69  /// }
70  /// ```
71  ///
72  /// **Note that the `Err` case can stem from a failed conversion from native
73  /// values to js values when calling the callback!**
74  ///
75  /// That's why:
76  ///
77  /// > **[This][`ErrorStrategy::CalleeHandled`] is the default error strategy**.
78  ///
79  /// In order to opt-out of it, [`ThreadsafeFunction`] has an optional second
80  /// generic parameter (of "kind" [`ErrorStrategy::T`]) that defines whether
81  /// this behavior ([`ErrorStrategy::CalleeHandled`]) or a non-`Result` one
82  /// ([`ErrorStrategy::Fatal`]) is desired.
83  pub enum ErrorStrategy {
84    /// Input errors (including conversion errors) are left for the callee to
85    /// handle:
86    ///
87    /// The callee receives an extra `error` parameter (the first one), which is
88    /// `null` if no error occurred, and the error payload otherwise.
89    CalleeHandled,
90
91    /// Input errors (including conversion errors) are deemed fatal:
92    ///
93    /// they can thus cause a `panic!` or abort the process.
94    ///
95    /// The callee thus is not expected to have to deal with [that extra `error`
96    /// parameter][CalleeHandled], which is thus not added.
97    Fatal,
98  }
99}
100
101struct ThreadsafeFunctionHandle {
102  raw: AtomicPtr<sys::napi_threadsafe_function__>,
103  aborted: RwLock<bool>,
104  referred: AtomicBool,
105}
106
107impl ThreadsafeFunctionHandle {
108  /// create a Arc to hold the `ThreadsafeFunctionHandle`
109  fn new(raw: sys::napi_threadsafe_function) -> Arc<Self> {
110    Arc::new(Self {
111      raw: AtomicPtr::new(raw),
112      aborted: RwLock::new(false),
113      referred: AtomicBool::new(true),
114    })
115  }
116
117  /// Lock `aborted` with read access, call `f` with the value of `aborted`, then unlock it
118  fn with_read_aborted<RT, F>(&self, f: F) -> RT
119  where
120    F: FnOnce(bool) -> RT,
121  {
122    let aborted_guard = self
123      .aborted
124      .read()
125      .expect("Threadsafe Function aborted lock failed");
126    f(*aborted_guard)
127  }
128
129  /// Lock `aborted` with write access, call `f` with the `RwLockWriteGuard`, then unlock it
130  fn with_write_aborted<RT, F>(&self, f: F) -> RT
131  where
132    F: FnOnce(RwLockWriteGuard<bool>) -> RT,
133  {
134    let aborted_guard = self
135      .aborted
136      .write()
137      .expect("Threadsafe Function aborted lock failed");
138    f(aborted_guard)
139  }
140
141  #[allow(clippy::arc_with_non_send_sync)]
142  fn null() -> Arc<Self> {
143    Self::new(null_mut())
144  }
145
146  fn get_raw(&self) -> sys::napi_threadsafe_function {
147    self.raw.load(Ordering::SeqCst)
148  }
149
150  fn set_raw(&self, raw: sys::napi_threadsafe_function) {
151    self.raw.store(raw, Ordering::SeqCst)
152  }
153}
154
155impl Drop for ThreadsafeFunctionHandle {
156  fn drop(&mut self) {
157    self.with_read_aborted(|aborted| {
158      if !aborted {
159        let release_status = unsafe {
160          sys::napi_release_threadsafe_function(
161            self.get_raw(),
162            sys::ThreadsafeFunctionReleaseMode::release,
163          )
164        };
165        assert!(
166          release_status == sys::Status::napi_ok,
167          "Threadsafe Function release failed {}",
168          Status::from(release_status)
169        );
170      }
171    })
172  }
173}
174
175#[repr(u8)]
176enum ThreadsafeFunctionCallVariant {
177  Direct,
178  WithCallback,
179}
180
181struct ThreadsafeFunctionCallJsBackData<T> {
182  data: T,
183  call_variant: ThreadsafeFunctionCallVariant,
184  callback: Box<dyn FnOnce(Result<JsUnknown>) -> Result<()>>,
185}
186
187/// Communicate with the addon's main thread by invoking a JavaScript function from other threads.
188///
189/// ## Example
190/// An example of using `ThreadsafeFunction`:
191///
192/// ```rust
193/// #[macro_use]
194/// extern crate napi_derive;
195///
196/// use std::thread;
197///
198/// use napi::{
199///     threadsafe_function::{
200///         ThreadSafeCallContext, ThreadsafeFunctionCallMode, ThreadsafeFunctionReleaseMode,
201///     },
202///     CallContext, Error, JsFunction, JsNumber, JsUndefined, Result, Status,
203/// };
204///
205/// #[js_function(1)]
206/// pub fn test_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> {
207///   let func = ctx.get::<JsFunction>(0)?;
208///
209///   let tsfn =
210///       ctx
211///           .env
212///           .create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<Vec<u32>>| {
213///             ctx.value
214///                 .iter()
215///                 .map(|v| ctx.env.create_uint32(*v))
216///                 .collect::<Result<Vec<JsNumber>>>()
217///           })?;
218///
219///   let tsfn_cloned = tsfn.clone();
220///
221///   thread::spawn(move || {
222///       let output: Vec<u32> = vec![0, 1, 2, 3];
223///       // It's okay to call a threadsafe function multiple times.
224///       tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::Blocking);
225///   });
226///
227///   thread::spawn(move || {
228///       let output: Vec<u32> = vec![3, 2, 1, 0];
229///       // It's okay to call a threadsafe function multiple times.
230///       tsfn_cloned.call(Ok(output.clone()), ThreadsafeFunctionCallMode::NonBlocking);
231///   });
232///
233///   ctx.env.get_undefined()
234/// }
235/// ```
236pub struct ThreadsafeFunction<T: 'static, ES: ErrorStrategy::T = ErrorStrategy::CalleeHandled> {
237  handle: Arc<ThreadsafeFunctionHandle>,
238  _phantom: PhantomData<(T, ES)>,
239}
240
241unsafe impl<T: 'static, ES: ErrorStrategy::T> Send for ThreadsafeFunction<T, ES> {}
242unsafe impl<T: 'static, ES: ErrorStrategy::T> Sync for ThreadsafeFunction<T, ES> {}
243
244impl<T: 'static, ES: ErrorStrategy::T> Clone for ThreadsafeFunction<T, ES> {
245  fn clone(&self) -> Self {
246    self.handle.with_read_aborted(|aborted| {
247      if aborted {
248        panic!("ThreadsafeFunction was aborted, can not clone it");
249      };
250
251      Self {
252        handle: self.handle.clone(),
253        _phantom: PhantomData,
254      }
255    })
256  }
257}
258
259impl<T: ToNapiValue> JsValuesTupleIntoVec for T {
260  #[allow(clippy::not_unsafe_ptr_arg_deref)]
261  fn into_vec(self, env: sys::napi_env) -> Result<Vec<sys::napi_value>> {
262    Ok(vec![unsafe {
263      <T as ToNapiValue>::to_napi_value(env, self)?
264    }])
265  }
266}
267
268macro_rules! impl_js_value_tuple_to_vec {
269  ($($ident:ident),*) => {
270    impl<$($ident: ToNapiValue),*> JsValuesTupleIntoVec for ($($ident,)*) {
271      #[allow(clippy::not_unsafe_ptr_arg_deref)]
272      fn into_vec(self, env: sys::napi_env) -> Result<Vec<sys::napi_value>> {
273        #[allow(non_snake_case)]
274        let ($($ident,)*) = self;
275        Ok(vec![$(unsafe { <$ident as ToNapiValue>::to_napi_value(env, $ident)? }),*])
276      }
277    }
278  };
279}
280
281impl_js_value_tuple_to_vec!(A);
282impl_js_value_tuple_to_vec!(A, B);
283impl_js_value_tuple_to_vec!(A, B, C);
284impl_js_value_tuple_to_vec!(A, B, C, D);
285impl_js_value_tuple_to_vec!(A, B, C, D, E);
286impl_js_value_tuple_to_vec!(A, B, C, D, E, F);
287impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G);
288impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H);
289impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I);
290impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J);
291impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K);
292impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L);
293impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M);
294impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N);
295impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O);
296impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P);
297impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q);
298impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R);
299impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S);
300impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T);
301impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U);
302impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V);
303impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W);
304impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X);
305impl_js_value_tuple_to_vec!(
306  A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y
307);
308impl_js_value_tuple_to_vec!(
309  A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z
310);
311
312impl<T: JsValuesTupleIntoVec + 'static, ES: ErrorStrategy::T> FromNapiValue
313  for ThreadsafeFunction<T, ES>
314{
315  unsafe fn from_napi_value(env: sys::napi_env, napi_val: sys::napi_value) -> Result<Self> {
316    Self::create(env, napi_val, 0, |ctx| ctx.value.into_vec(ctx.env.0))
317  }
318}
319
320impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
321  /// See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function)
322  /// for more information.
323  pub(crate) fn create<
324    V: ToNapiValue,
325    R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
326  >(
327    env: sys::napi_env,
328    func: sys::napi_value,
329    max_queue_size: usize,
330    callback: R,
331  ) -> Result<Self> {
332    let mut async_resource_name = ptr::null_mut();
333    let s = "napi_rs_threadsafe_function";
334    let len = s.len();
335    let s = CString::new(s)?;
336    check_status!(unsafe {
337      sys::napi_create_string_utf8(env, s.as_ptr(), len, &mut async_resource_name)
338    })?;
339
340    let mut raw_tsfn = ptr::null_mut();
341    let callback_ptr = Box::into_raw(Box::new(callback));
342    let handle = ThreadsafeFunctionHandle::null();
343    check_status!(unsafe {
344      sys::napi_create_threadsafe_function(
345        env,
346        func,
347        ptr::null_mut(),
348        async_resource_name,
349        max_queue_size,
350        1,
351        Arc::downgrade(&handle).into_raw() as *mut c_void, // pass handler to thread_finalize_cb
352        Some(thread_finalize_cb::<T, V, R>),
353        callback_ptr.cast(),
354        Some(call_js_cb::<T, V, R, ES>),
355        &mut raw_tsfn,
356      )
357    })?;
358    handle.set_raw(raw_tsfn);
359
360    Ok(ThreadsafeFunction {
361      handle,
362      _phantom: PhantomData,
363    })
364  }
365
366  /// See [napi_ref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_ref_threadsafe_function)
367  /// for more information.
368  ///
369  /// "ref" is a keyword so that we use "refer" here.
370  pub fn refer(&mut self, env: &Env) -> Result<()> {
371    self.handle.with_read_aborted(|aborted| {
372      if !aborted && !self.handle.referred.load(Ordering::Relaxed) {
373        check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.handle.get_raw()) })?;
374        self.handle.referred.store(true, Ordering::Relaxed);
375      }
376      Ok(())
377    })
378  }
379
380  /// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function)
381  /// for more information.
382  pub fn unref(&mut self, env: &Env) -> Result<()> {
383    self.handle.with_read_aborted(|aborted| {
384      if !aborted && self.handle.referred.load(Ordering::Relaxed) {
385        check_status!(unsafe {
386          sys::napi_unref_threadsafe_function(env.0, self.handle.get_raw())
387        })?;
388        self.handle.referred.store(false, Ordering::Relaxed);
389      }
390      Ok(())
391    })
392  }
393
394  pub fn aborted(&self) -> bool {
395    self.handle.with_read_aborted(|aborted| aborted)
396  }
397
398  pub fn abort(self) -> Result<()> {
399    self.handle.with_write_aborted(|mut aborted_guard| {
400      if !*aborted_guard {
401        check_status!(unsafe {
402          sys::napi_release_threadsafe_function(
403            self.handle.get_raw(),
404            sys::ThreadsafeFunctionReleaseMode::abort,
405          )
406        })?;
407        *aborted_guard = true;
408      }
409      Ok(())
410    })
411  }
412
413  /// Get the raw `ThreadSafeFunction` pointer
414  pub fn raw(&self) -> sys::napi_threadsafe_function {
415    self.handle.get_raw()
416  }
417}
418
419impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> {
420  /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
421  /// for more information.
422  pub fn call(&self, value: Result<T>, mode: ThreadsafeFunctionCallMode) -> Status {
423    self.handle.with_read_aborted(|aborted| {
424      if aborted {
425        return Status::Closing;
426      }
427
428      unsafe {
429        sys::napi_call_threadsafe_function(
430          self.handle.get_raw(),
431          Box::into_raw(Box::new(value.map(|data| {
432            ThreadsafeFunctionCallJsBackData {
433              data,
434              call_variant: ThreadsafeFunctionCallVariant::Direct,
435              callback: Box::new(|_d: Result<JsUnknown>| Ok(())),
436            }
437          })))
438          .cast(),
439          mode.into(),
440        )
441      }
442      .into()
443    })
444  }
445
446  pub fn call_with_return_value<D: FromNapiValue, F: 'static + FnOnce(D) -> Result<()>>(
447    &self,
448    value: Result<T>,
449    mode: ThreadsafeFunctionCallMode,
450    cb: F,
451  ) -> Status {
452    self.handle.with_read_aborted(|aborted| {
453      if aborted {
454        return Status::Closing;
455      }
456
457      unsafe {
458        sys::napi_call_threadsafe_function(
459          self.handle.get_raw(),
460          Box::into_raw(Box::new(value.map(|data| {
461            ThreadsafeFunctionCallJsBackData {
462              data,
463              call_variant: ThreadsafeFunctionCallVariant::WithCallback,
464              callback: Box::new(move |d: Result<JsUnknown>| {
465                d.and_then(|d| D::from_napi_value(d.0.env, d.0.value).and_then(cb))
466              }),
467            }
468          })))
469          .cast(),
470          mode.into(),
471        )
472      }
473      .into()
474    })
475  }
476
477  pub fn call_with_return_value_raw<
478    D: FromNapiValue,
479    F: 'static + FnOnce(Result<D>) -> Result<()>,
480  >(
481    &self,
482    value: Result<T>,
483    mode: ThreadsafeFunctionCallMode,
484    cb: F,
485  ) -> Status {
486    self.handle.with_read_aborted(|aborted| {
487      if aborted {
488        return Status::Closing;
489      }
490
491      unsafe {
492        sys::napi_call_threadsafe_function(
493          self.handle.get_raw(),
494          Box::into_raw(Box::new(value.map(|data| {
495            ThreadsafeFunctionCallJsBackData {
496              data,
497              call_variant: ThreadsafeFunctionCallVariant::WithCallback,
498              callback: Box::new(move |d: Result<JsUnknown>| {
499                cb(d.and_then(|d| D::from_napi_value(d.0.env, d.0.value)))
500              }),
501            }
502          })))
503          .cast(),
504          mode.into(),
505        )
506      }
507      .into()
508    })
509  }
510
511  #[cfg(feature = "tokio_rt")]
512  pub async fn call_async<D: 'static + FromNapiValue>(&self, value: Result<T>) -> Result<D> {
513    let (sender, receiver) = tokio::sync::oneshot::channel::<Result<D>>();
514
515    self.handle.with_read_aborted(|aborted| {
516      if aborted {
517        return Err(crate::Error::from_status(Status::Closing));
518      }
519
520      check_status!(
521        unsafe {
522          sys::napi_call_threadsafe_function(
523            self.handle.get_raw(),
524            Box::into_raw(Box::new(value.map(|data| {
525              ThreadsafeFunctionCallJsBackData {
526                data,
527                call_variant: ThreadsafeFunctionCallVariant::WithCallback,
528                callback: Box::new(move |d: Result<JsUnknown>| {
529                  sender
530                    .send(d.and_then(|d| D::from_napi_value(d.0.env, d.0.value)))
531                    // The only reason for send to return Err is if the receiver isn't listening
532                    // Not hiding the error would result in a napi_fatal_error call, it's safe to ignore it instead.
533                    .or(Ok(()))
534                }),
535              }
536            })))
537            .cast(),
538            ThreadsafeFunctionCallMode::NonBlocking.into(),
539          )
540        },
541        "Threadsafe function call_async failed"
542      )
543    })?;
544    receiver
545      .await
546      .map_err(|_| {
547        crate::Error::new(
548          Status::GenericFailure,
549          "Receive value from threadsafe function sender failed",
550        )
551      })
552      .and_then(|ret| ret)
553  }
554}
555
556impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::Fatal> {
557  /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
558  /// for more information.
559  pub fn call(&self, value: T, mode: ThreadsafeFunctionCallMode) -> Status {
560    self.handle.with_read_aborted(|aborted| {
561      if aborted {
562        return Status::Closing;
563      }
564
565      unsafe {
566        sys::napi_call_threadsafe_function(
567          self.handle.get_raw(),
568          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
569            data: value,
570            call_variant: ThreadsafeFunctionCallVariant::Direct,
571            callback: Box::new(|_d: Result<JsUnknown>| Ok(())),
572          }))
573          .cast(),
574          mode.into(),
575        )
576      }
577      .into()
578    })
579  }
580
581  pub fn call_with_return_value<D: FromNapiValue, F: 'static + FnOnce(D) -> Result<()>>(
582    &self,
583    value: T,
584    mode: ThreadsafeFunctionCallMode,
585    cb: F,
586  ) -> Status {
587    self.handle.with_read_aborted(|aborted| {
588      if aborted {
589        return Status::Closing;
590      }
591
592      unsafe {
593        sys::napi_call_threadsafe_function(
594          self.handle.get_raw(),
595          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
596            data: value,
597            call_variant: ThreadsafeFunctionCallVariant::WithCallback,
598            callback: Box::new(move |d: Result<JsUnknown>| {
599              d.and_then(|d| D::from_napi_value(d.0.env, d.0.value).and_then(cb))
600            }),
601          }))
602          .cast(),
603          mode.into(),
604        )
605      }
606      .into()
607    })
608  }
609
610  pub fn call_with_return_value_raw<
611    D: FromNapiValue,
612    F: 'static + FnOnce(Result<D>) -> Result<()>,
613  >(
614    &self,
615    value: T,
616    mode: ThreadsafeFunctionCallMode,
617    cb: F,
618  ) -> Status {
619    self.handle.with_read_aborted(|aborted| {
620      if aborted {
621        return Status::Closing;
622      }
623
624      unsafe {
625        sys::napi_call_threadsafe_function(
626          self.handle.get_raw(),
627          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
628            data: value,
629            call_variant: ThreadsafeFunctionCallVariant::WithCallback,
630            callback: Box::new(move |d: Result<JsUnknown>| {
631              cb(d.and_then(|d| D::from_napi_value(d.0.env, d.0.value)))
632            }),
633          }))
634          .cast(),
635          mode.into(),
636        )
637      }
638      .into()
639    })
640  }
641
642  #[cfg(feature = "tokio_rt")]
643  pub async fn call_async<D: 'static + FromNapiValue>(&self, value: T) -> Result<D> {
644    let (sender, receiver) = tokio::sync::oneshot::channel::<D>();
645
646    self.handle.with_read_aborted(|aborted| {
647      if aborted {
648        return Err(crate::Error::from_status(Status::Closing));
649      }
650
651      check_status!(unsafe {
652        sys::napi_call_threadsafe_function(
653          self.handle.get_raw(),
654          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
655            data: value,
656            call_variant: ThreadsafeFunctionCallVariant::WithCallback,
657            callback: Box::new(move |d: Result<JsUnknown>| {
658              d.and_then(|d| {
659                D::from_napi_value(d.0.env, d.0.value).and_then(move |d| {
660                  sender
661                    .send(d)
662                    // The only reason for send to return Err is if the receiver isn't listening
663                    // Not hiding the error would result in a napi_fatal_error call, it's safe to ignore it instead.
664                    .or(Ok(()))
665                })
666              })
667            }),
668          }))
669          .cast(),
670          ThreadsafeFunctionCallMode::NonBlocking.into(),
671        )
672      })
673    })?;
674
675    receiver
676      .await
677      .map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err)))
678  }
679}
680
681#[allow(unused_variables)]
682unsafe extern "C" fn thread_finalize_cb<T: 'static, V: ToNapiValue, R>(
683  env: sys::napi_env,
684  finalize_data: *mut c_void,
685  finalize_hint: *mut c_void,
686) where
687  R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
688{
689  let handle_option =
690    unsafe { Weak::from_raw(finalize_data.cast::<ThreadsafeFunctionHandle>()).upgrade() };
691
692  if let Some(handle) = handle_option {
693    handle.with_write_aborted(|mut aborted_guard| {
694      if !*aborted_guard {
695        *aborted_guard = true;
696      }
697    });
698  }
699
700  // cleanup
701  drop(unsafe { Box::<R>::from_raw(finalize_hint.cast()) });
702}
703
704unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
705  raw_env: sys::napi_env,
706  js_callback: sys::napi_value,
707  context: *mut c_void,
708  data: *mut c_void,
709) where
710  R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
711  ES: ErrorStrategy::T,
712{
713  // env and/or callback can be null when shutting down
714  if raw_env.is_null() || js_callback.is_null() {
715    return;
716  }
717
718  let ctx: &mut R = unsafe { Box::leak(Box::from_raw(context.cast())) };
719  let val = unsafe {
720    match ES::VALUE {
721      ErrorStrategy::CalleeHandled::VALUE => {
722        *Box::<Result<ThreadsafeFunctionCallJsBackData<T>>>::from_raw(data.cast())
723      }
724      ErrorStrategy::Fatal::VALUE => Ok(*Box::<ThreadsafeFunctionCallJsBackData<T>>::from_raw(
725        data.cast(),
726      )),
727    }
728  };
729
730  let mut recv = ptr::null_mut();
731  unsafe { sys::napi_get_undefined(raw_env, &mut recv) };
732
733  let ret = val.and_then(|v| {
734    (ctx)(ThreadSafeCallContext {
735      env: unsafe { Env::from_raw(raw_env) },
736      value: v.data,
737    })
738    .map(|ret| (ret, v.call_variant, v.callback))
739  });
740
741  // Follow async callback conventions: https://nodejs.org/en/knowledge/errors/what-are-the-error-conventions/
742  // Check if the Result is okay, if so, pass a null as the first (error) argument automatically.
743  // If the Result is an error, pass that as the first argument.
744  let status = match ret {
745    Ok((values, call_variant, callback)) => {
746      let values = values
747        .into_iter()
748        .map(|v| unsafe { ToNapiValue::to_napi_value(raw_env, v) });
749      let args: Result<Vec<sys::napi_value>> = if ES::VALUE == ErrorStrategy::CalleeHandled::VALUE {
750        let mut js_null = ptr::null_mut();
751        unsafe { sys::napi_get_null(raw_env, &mut js_null) };
752        ::core::iter::once(Ok(js_null)).chain(values).collect()
753      } else {
754        values.collect()
755      };
756      let mut return_value = ptr::null_mut();
757      let mut status = match args {
758        Ok(args) => unsafe {
759          sys::napi_call_function(
760            raw_env,
761            recv,
762            js_callback,
763            args.len(),
764            args.as_ptr(),
765            &mut return_value,
766          )
767        },
768        Err(e) => match ES::VALUE {
769          ErrorStrategy::Fatal::VALUE => unsafe {
770            sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env))
771          },
772          ErrorStrategy::CalleeHandled::VALUE => unsafe {
773            sys::napi_call_function(
774              raw_env,
775              recv,
776              js_callback,
777              1,
778              [JsError::from(e).into_value(raw_env)].as_mut_ptr(),
779              &mut return_value,
780            )
781          },
782        },
783      };
784      if let ThreadsafeFunctionCallVariant::WithCallback = call_variant {
785        // throw Error in JavaScript callback
786        let callback_arg = if status == sys::Status::napi_pending_exception {
787          let mut exception = ptr::null_mut();
788          status = unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut exception) };
789          Err(
790            JsUnknown(crate::Value {
791              env: raw_env,
792              value: exception,
793              value_type: crate::ValueType::Unknown,
794            })
795            .into(),
796          )
797        } else {
798          Ok(JsUnknown(crate::Value {
799            env: raw_env,
800            value: return_value,
801            value_type: crate::ValueType::Unknown,
802          }))
803        };
804        if let Err(err) = callback(callback_arg) {
805          let message = format!(
806            "Failed to convert return value in ThreadsafeFunction callback into Rust value: {}",
807            err
808          );
809          let message_length = message.len();
810          let c_message = CString::new(message).unwrap();
811          unsafe {
812            sys::napi_fatal_error(
813              "threadsafe_function.rs:749\0".as_ptr().cast(),
814              26,
815              c_message.as_ptr(),
816              message_length,
817            )
818          };
819        }
820      }
821      status
822    }
823    Err(e) if ES::VALUE == ErrorStrategy::Fatal::VALUE => unsafe {
824      sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env))
825    },
826    Err(e) => unsafe {
827      sys::napi_call_function(
828        raw_env,
829        recv,
830        js_callback,
831        1,
832        [JsError::from(e).into_value(raw_env)].as_mut_ptr(),
833        ptr::null_mut(),
834      )
835    },
836  };
837  if status == sys::Status::napi_ok {
838    return;
839  }
840  if status == sys::Status::napi_pending_exception {
841    let mut error_result = ptr::null_mut();
842    assert_eq!(
843      unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut error_result) },
844      sys::Status::napi_ok
845    );
846
847    // When shutting down, napi_fatal_exception sometimes returns another exception
848    let stat = unsafe { sys::napi_fatal_exception(raw_env, error_result) };
849    assert!(stat == sys::Status::napi_ok || stat == sys::Status::napi_pending_exception);
850  } else {
851    let error_code: Status = status.into();
852    let error_code_string = format!("{:?}", error_code);
853    let mut error_code_value = ptr::null_mut();
854    assert_eq!(
855      unsafe {
856        sys::napi_create_string_utf8(
857          raw_env,
858          error_code_string.as_ptr() as *const _,
859          error_code_string.len(),
860          &mut error_code_value,
861        )
862      },
863      sys::Status::napi_ok,
864    );
865    let error_msg = "Call JavaScript callback failed in threadsafe function";
866    let mut error_msg_value = ptr::null_mut();
867    assert_eq!(
868      unsafe {
869        sys::napi_create_string_utf8(
870          raw_env,
871          error_msg.as_ptr() as *const _,
872          error_msg.len(),
873          &mut error_msg_value,
874        )
875      },
876      sys::Status::napi_ok,
877    );
878    let mut error_value = ptr::null_mut();
879    assert_eq!(
880      unsafe {
881        sys::napi_create_error(raw_env, error_code_value, error_msg_value, &mut error_value)
882      },
883      sys::Status::napi_ok,
884    );
885    assert_eq!(
886      unsafe { sys::napi_fatal_exception(raw_env, error_value) },
887      sys::Status::napi_ok
888    );
889  }
890}
891
892/// Helper
893macro_rules! type_level_enum {(
894  $( #[doc = $doc:tt] )*
895  $pub:vis
896  enum $EnumName:ident {
897    $(
898      $( #[doc = $doc_variant:tt] )*
899      $Variant:ident
900    ),* $(,)?
901  }
902) => (type_level_enum! { // This requires the macro to be in scope when called.
903  with_docs! {
904    $( #[doc = $doc] )*
905    ///
906    /// ### Type-level `enum`
907    ///
908    /// Until `const_generics` can handle custom `enum`s, this pattern must be
909    /// implemented at the type level.
910    ///
911    /// We thus end up with:
912    ///
913    /// ```rust,ignore
914    /// #[type_level_enum]
915    #[doc = ::core::concat!(
916      " enum ", ::core::stringify!($EnumName), " {",
917    )]
918    $(
919      #[doc = ::core::concat!(
920        "     ", ::core::stringify!($Variant), ",",
921      )]
922    )*
923    #[doc = " }"]
924    /// ```
925    ///
926    #[doc = ::core::concat!(
927      "With [`", ::core::stringify!($EnumName), "::T`](#reexports) \
928      being the type-level \"enum type\":",
929    )]
930    ///
931    /// ```rust,ignore
932    #[doc = ::core::concat!(
933      "<Param: ", ::core::stringify!($EnumName), "::T>"
934    )]
935    /// ```
936  }
937  #[allow(warnings)]
938  $pub mod $EnumName {
939    #[doc(no_inline)]
940    pub use $EnumName as T;
941
942    super::type_level_enum! {
943      with_docs! {
944        #[doc = ::core::concat!(
945          "See [`", ::core::stringify!($EnumName), "`]\
946          [super::", ::core::stringify!($EnumName), "]"
947        )]
948      }
949      pub trait $EnumName : __sealed::$EnumName + ::core::marker::Sized + 'static {
950        const VALUE: __value::$EnumName;
951      }
952    }
953
954    mod __sealed { pub trait $EnumName {} }
955
956    mod __value {
957      #[derive(Debug, PartialEq, Eq)]
958      pub enum $EnumName { $( $Variant ),* }
959    }
960
961    $(
962      $( #[doc = $doc_variant] )*
963      pub enum $Variant {}
964      impl __sealed::$EnumName for $Variant {}
965      impl $EnumName for $Variant {
966        const VALUE: __value::$EnumName = __value::$EnumName::$Variant;
967      }
968      impl $Variant {
969        pub const VALUE: __value::$EnumName = __value::$EnumName::$Variant;
970      }
971    )*
972  }
973});(
974  with_docs! {
975    $( #[doc = $doc:expr] )*
976  }
977  $item:item
978) => (
979  $( #[doc = $doc] )*
980  $item
981)}
982
983use type_level_enum;
984
985pub struct UnknownReturnValue;
986
987impl TypeName for UnknownReturnValue {
988  fn type_name() -> &'static str {
989    "UnknownReturnValue"
990  }
991
992  fn value_type() -> crate::ValueType {
993    crate::ValueType::Unknown
994  }
995}
996
997impl ValidateNapiValue for UnknownReturnValue {}
998
999impl FromNapiValue for UnknownReturnValue {
1000  unsafe fn from_napi_value(_env: sys::napi_env, _napi_val: sys::napi_value) -> Result<Self> {
1001    Ok(UnknownReturnValue)
1002  }
1003}