gloo_worker/actor/
scope.rs

1use std::cell::RefCell;
2use std::fmt;
3#[cfg(feature = "futures")]
4use std::future::Future;
5use std::rc::Rc;
6
7use serde::de::Deserialize;
8use serde::ser::Serialize;
9use wasm_bindgen_futures::spawn_local;
10
11use super::handler_id::HandlerId;
12use super::lifecycle::{WorkerLifecycleEvent, WorkerRunnable, WorkerState};
13use super::messages::FromWorker;
14use super::native_worker::{DedicatedWorker, NativeWorkerExt, WorkerSelf};
15use super::traits::Worker;
16use super::Shared;
17use crate::codec::Codec;
18
19/// A handle that closes the worker when it is dropped.
20pub struct WorkerDestroyHandle<W>
21where
22    W: Worker + 'static,
23{
24    scope: WorkerScope<W>,
25}
26
27impl<W: Worker> fmt::Debug for WorkerDestroyHandle<W> {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.write_str("WorkerDestroyHandle<_>")
30    }
31}
32
33impl<W> WorkerDestroyHandle<W>
34where
35    W: Worker,
36{
37    pub(crate) fn new(scope: WorkerScope<W>) -> Self {
38        Self { scope }
39    }
40}
41
42impl<W> Drop for WorkerDestroyHandle<W>
43where
44    W: Worker,
45{
46    fn drop(&mut self) {
47        self.scope.send(WorkerLifecycleEvent::Destroy);
48    }
49}
50
51/// This struct holds a reference to a component and to a global scheduler.
52pub struct WorkerScope<W: Worker> {
53    state: Shared<WorkerState<W>>,
54    post_msg: Rc<dyn Fn(FromWorker<W>)>,
55}
56
57impl<W: Worker> fmt::Debug for WorkerScope<W> {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.write_str("WorkerScope<_>")
60    }
61}
62
63impl<W: Worker> Clone for WorkerScope<W> {
64    fn clone(&self) -> Self {
65        WorkerScope {
66            state: self.state.clone(),
67            post_msg: self.post_msg.clone(),
68        }
69    }
70}
71
72impl<W> WorkerScope<W>
73where
74    W: Worker + 'static,
75{
76    /// Create worker scope
77    pub(crate) fn new<CODEC>() -> Self
78    where
79        CODEC: Codec,
80        W::Output: Serialize + for<'de> Deserialize<'de>,
81    {
82        let post_msg = move |msg: FromWorker<W>| {
83            DedicatedWorker::worker_self().post_packed_message::<_, CODEC>(msg)
84        };
85
86        let state = Rc::new(RefCell::new(WorkerState::new()));
87        WorkerScope {
88            post_msg: Rc::new(post_msg),
89            state,
90        }
91    }
92
93    /// Schedule message for sending to worker
94    pub(crate) fn send(&self, event: WorkerLifecycleEvent<W>) {
95        let state = self.state.clone();
96
97        // We can implement a custom scheduler,
98        // but it's easier to borrow the one from wasm-bindgen-futures.
99        spawn_local(async move {
100            WorkerRunnable { state, event }.run();
101        });
102    }
103
104    /// Send response to a worker bridge.
105    pub fn respond(&self, id: HandlerId, output: W::Output) {
106        let msg = FromWorker::<W>::ProcessOutput(id, output);
107        (self.post_msg)(msg);
108    }
109
110    /// Send a message to the worker
111    pub fn send_message<T>(&self, msg: T)
112    where
113        T: Into<W::Message>,
114    {
115        self.send(WorkerLifecycleEvent::Message(msg.into()));
116    }
117
118    /// Create a callback which will send a message to the worker when invoked.
119    pub fn callback<F, IN, M>(&self, function: F) -> Rc<dyn Fn(IN)>
120    where
121        M: Into<W::Message>,
122        F: Fn(IN) -> M + 'static,
123    {
124        let scope = self.clone();
125        let closure = move |input| {
126            let output = function(input).into();
127            scope.send(WorkerLifecycleEvent::Message(output));
128        };
129        Rc::new(closure)
130    }
131
132    /// This method creates a callback which returns a Future which
133    /// returns a message to be sent back to the worker
134    ///
135    /// # Panics
136    /// If the future panics, then the promise will not resolve, and
137    /// will leak.
138    #[cfg(feature = "futures")]
139    #[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
140    pub fn callback_future<FN, FU, IN, M>(&self, function: FN) -> Rc<dyn Fn(IN)>
141    where
142        M: Into<W::Message>,
143        FU: Future<Output = M> + 'static,
144        FN: Fn(IN) -> FU + 'static,
145    {
146        let scope = self.clone();
147
148        let closure = move |input: IN| {
149            let future: FU = function(input);
150            scope.send_future(future);
151        };
152
153        Rc::new(closure)
154    }
155
156    /// This method processes a Future that returns a message and sends it back to the worker.
157    ///
158    /// # Panics
159    /// If the future panics, then the promise will not resolve, and will leak.
160    #[cfg(feature = "futures")]
161    #[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
162    pub fn send_future<F, M>(&self, future: F)
163    where
164        M: Into<W::Message>,
165        F: Future<Output = M> + 'static,
166    {
167        let scope = self.clone();
168        let js_future = async move {
169            let message: W::Message = future.await.into();
170            let cb = scope.callback(|m: W::Message| m);
171            (*cb)(message);
172        };
173        wasm_bindgen_futures::spawn_local(js_future);
174    }
175}