gloo_worker/actor/
scope.rs1use std::cell::RefCell;
2use std::fmt;
3#[cfg(feature = "futures")]
4use std::future::Future;
5use std::rc::Rc;
6
7use serde::de::Deserialize;
8use serde::ser::Serialize;
9use wasm_bindgen_futures::spawn_local;
10
11use super::handler_id::HandlerId;
12use super::lifecycle::{WorkerLifecycleEvent, WorkerRunnable, WorkerState};
13use super::messages::FromWorker;
14use super::native_worker::{DedicatedWorker, NativeWorkerExt, WorkerSelf};
15use super::traits::Worker;
16use super::Shared;
17use crate::codec::Codec;
18
19pub struct WorkerDestroyHandle<W>
21where
22 W: Worker + 'static,
23{
24 scope: WorkerScope<W>,
25}
26
27impl<W: Worker> fmt::Debug for WorkerDestroyHandle<W> {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.write_str("WorkerDestroyHandle<_>")
30 }
31}
32
33impl<W> WorkerDestroyHandle<W>
34where
35 W: Worker,
36{
37 pub(crate) fn new(scope: WorkerScope<W>) -> Self {
38 Self { scope }
39 }
40}
41
42impl<W> Drop for WorkerDestroyHandle<W>
43where
44 W: Worker,
45{
46 fn drop(&mut self) {
47 self.scope.send(WorkerLifecycleEvent::Destroy);
48 }
49}
50
51pub struct WorkerScope<W: Worker> {
53 state: Shared<WorkerState<W>>,
54 post_msg: Rc<dyn Fn(FromWorker<W>)>,
55}
56
57impl<W: Worker> fmt::Debug for WorkerScope<W> {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.write_str("WorkerScope<_>")
60 }
61}
62
63impl<W: Worker> Clone for WorkerScope<W> {
64 fn clone(&self) -> Self {
65 WorkerScope {
66 state: self.state.clone(),
67 post_msg: self.post_msg.clone(),
68 }
69 }
70}
71
72impl<W> WorkerScope<W>
73where
74 W: Worker + 'static,
75{
76 pub(crate) fn new<CODEC>() -> Self
78 where
79 CODEC: Codec,
80 W::Output: Serialize + for<'de> Deserialize<'de>,
81 {
82 let post_msg = move |msg: FromWorker<W>| {
83 DedicatedWorker::worker_self().post_packed_message::<_, CODEC>(msg)
84 };
85
86 let state = Rc::new(RefCell::new(WorkerState::new()));
87 WorkerScope {
88 post_msg: Rc::new(post_msg),
89 state,
90 }
91 }
92
93 pub(crate) fn send(&self, event: WorkerLifecycleEvent<W>) {
95 let state = self.state.clone();
96
97 spawn_local(async move {
100 WorkerRunnable { state, event }.run();
101 });
102 }
103
104 pub fn respond(&self, id: HandlerId, output: W::Output) {
106 let msg = FromWorker::<W>::ProcessOutput(id, output);
107 (self.post_msg)(msg);
108 }
109
110 pub fn send_message<T>(&self, msg: T)
112 where
113 T: Into<W::Message>,
114 {
115 self.send(WorkerLifecycleEvent::Message(msg.into()));
116 }
117
118 pub fn callback<F, IN, M>(&self, function: F) -> Rc<dyn Fn(IN)>
120 where
121 M: Into<W::Message>,
122 F: Fn(IN) -> M + 'static,
123 {
124 let scope = self.clone();
125 let closure = move |input| {
126 let output = function(input).into();
127 scope.send(WorkerLifecycleEvent::Message(output));
128 };
129 Rc::new(closure)
130 }
131
132 #[cfg(feature = "futures")]
139 #[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
140 pub fn callback_future<FN, FU, IN, M>(&self, function: FN) -> Rc<dyn Fn(IN)>
141 where
142 M: Into<W::Message>,
143 FU: Future<Output = M> + 'static,
144 FN: Fn(IN) -> FU + 'static,
145 {
146 let scope = self.clone();
147
148 let closure = move |input: IN| {
149 let future: FU = function(input);
150 scope.send_future(future);
151 };
152
153 Rc::new(closure)
154 }
155
156 #[cfg(feature = "futures")]
161 #[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
162 pub fn send_future<F, M>(&self, future: F)
163 where
164 M: Into<W::Message>,
165 F: Future<Output = M> + 'static,
166 {
167 let scope = self.clone();
168 let js_future = async move {
169 let message: W::Message = future.await.into();
170 let cb = scope.callback(|m: W::Message| m);
171 (*cb)(message);
172 };
173 wasm_bindgen_futures::spawn_local(js_future);
174 }
175}