yew_stdweb/agent/worker/
public.rs1use super::*;
2use crate::callback::Callback;
3use crate::scheduler::Shared;
4use anymap::{self, AnyMap};
5use cfg_if::cfg_if;
6use cfg_match::cfg_match;
7use queue::Queue;
8use slab::Slab;
9use std::any::TypeId;
10use std::cell::RefCell;
11use std::fmt;
12use std::marker::PhantomData;
13use std::rc::Rc;
14cfg_if! {
15 if #[cfg(feature = "std_web")] {
16 use stdweb::Value;
17 #[allow(unused_imports)]
18 use stdweb::{_js_impl, js};
19 } else if #[cfg(feature = "web_sys")] {
20 use super::WorkerExt;
21 use web_sys::{Worker};
22 }
23}
24
25thread_local! {
26 static REMOTE_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
27 static QUEUE: Queue<TypeId> = Queue::new();
28}
29
30#[allow(missing_debug_implementations)]
32pub struct Public<AGN> {
33 _agent: PhantomData<AGN>,
34}
35
36impl<AGN> Discoverer for Public<AGN>
37where
38 AGN: Agent,
39 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
40 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
41{
42 type Agent = AGN;
43
44 fn spawn_or_join(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
45 let bridge = REMOTE_AGENTS_POOL.with(|pool| {
46 let mut pool = pool.borrow_mut();
47 match pool.entry::<RemoteAgent<AGN>>() {
48 anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback),
49 anymap::Entry::Vacant(entry) => {
50 let slab: Shared<Slab<Option<Callback<AGN::Output>>>> =
51 Rc::new(RefCell::new(Slab::new()));
52 let handler = {
53 let slab = slab.clone();
54 move |data: Vec<u8>,
55 #[cfg(feature = "std_web")] worker: Value,
56 #[cfg(feature = "web_sys")] worker: &Worker| {
57 let msg = FromWorker::<AGN::Output>::unpack(&data);
58 match msg {
59 FromWorker::WorkerLoaded => {
60 QUEUE.with(|queue| {
61 queue.insert_loaded_agent(TypeId::of::<AGN>());
62
63 if let Some(msgs) = queue.remove_msg_queue(&TypeId::of::<AGN>()) {
64 for msg in msgs {
65 cfg_match! {
66 feature = "std_web" => ({
67 let worker = &worker;
68 js! {@{worker}.postMessage(@{msg});};
69 }),
70 feature = "web_sys" => worker.post_message_vec(msg),
71 }
72 }
73 }
74 });
75 }
76 FromWorker::ProcessOutput(id, output) => {
77 locate_callback_and_respond::<AGN>(&slab, id, output);
78 }
79 }
80 }
81 };
82 let name_of_resource = AGN::name_of_resource();
83 let worker = cfg_match! {
84 feature = "std_web" => js! {
85 var worker = new Worker(@{name_of_resource});
86 var handler = @{handler};
87 worker.onmessage = function(event) {
88 handler(event.data, worker);
89 };
90 return worker;
91 },
92 feature = "web_sys" => ({
93 let worker = worker_new(name_of_resource, AGN::is_module());
94 let worker_clone = worker.clone();
95 worker.set_onmessage_closure(move |data: Vec<u8>| {
96 handler(data, &worker_clone);
97 });
98 worker
99 }),
100 };
101 let launched = RemoteAgent::new(worker, slab);
102 entry.insert(launched).create_bridge(callback)
103 }
104 }
105 });
106 Box::new(bridge)
107 }
108}
109
110impl<AGN> Dispatchable for Public<AGN>
111where
112 AGN: Agent,
113 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
114 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
115{
116}
117
118pub struct PublicBridge<AGN>
120where
121 AGN: Agent,
122 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
123 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
124{
125 #[cfg(feature = "std_web")]
126 worker: Value,
127 #[cfg(feature = "web_sys")]
128 worker: Worker,
129 id: HandlerId,
130 _agent: PhantomData<AGN>,
131}
132
133impl<AGN> fmt::Debug for PublicBridge<AGN>
134where
135 AGN: Agent,
136 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
137 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
138{
139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140 f.write_str("PublicBridge<_>")
141 }
142}
143
144impl<AGN> PublicBridge<AGN>
145where
146 AGN: Agent,
147 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
148 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
149{
150 fn send_message(&self, msg: ToWorker<AGN::Input>) {
152 QUEUE.with(|queue| {
153 if queue.is_worker_loaded(&TypeId::of::<AGN>()) {
154 send_to_remote::<AGN>(&self.worker, msg);
155 } else {
156 queue.add_msg_to_queue(msg.pack(), TypeId::of::<AGN>());
157 }
158 });
159 }
160}
161
162impl<AGN> Bridge<AGN> for PublicBridge<AGN>
163where
164 AGN: Agent,
165 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
166 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
167{
168 fn send(&mut self, msg: AGN::Input) {
169 let msg = ToWorker::ProcessInput(self.id, msg);
170 self.send_message(msg);
171 }
172}
173
174impl<AGN> Drop for PublicBridge<AGN>
175where
176 AGN: Agent,
177 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
178 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
179{
180 fn drop(&mut self) {
181 let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| {
182 let mut pool = pool.borrow_mut();
183 let terminate_worker = {
184 if let Some(launched) = pool.get_mut::<RemoteAgent<AGN>>() {
185 launched.remove_bridge(self)
186 } else {
187 false
188 }
189 };
190
191 if terminate_worker {
192 pool.remove::<RemoteAgent<AGN>>();
193 }
194
195 terminate_worker
196 });
197
198 let disconnected = ToWorker::Disconnected(self.id);
199 self.send_message(disconnected);
200
201 if terminate_worker {
202 let destroy = ToWorker::Destroy;
203 self.send_message(destroy);
204
205 QUEUE.with(|queue| {
206 queue.remove_agent(&TypeId::of::<AGN>());
207 });
208 }
209 }
210}
211
212struct WorkerResponder {}
213
214impl<AGN> Responder<AGN> for WorkerResponder
215where
216 AGN: Agent,
217 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
218 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
219{
220 fn respond(&self, id: HandlerId, output: AGN::Output) {
221 let msg = FromWorker::ProcessOutput(id, output);
222 let data = msg.pack();
223 cfg_match! {
224 feature = "std_web" => js! {
225 var data = @{data};
226 self.postMessage(data);
227 },
228 feature = "web_sys" => worker_self().post_message_vec(data),
229 };
230 }
231}
232
233impl<AGN> Threaded for AGN
234where
235 AGN: Agent<Reach = Public<AGN>>,
236 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
237 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
238{
239 fn register() {
240 let scope = AgentScope::<AGN>::new();
241 let responder = WorkerResponder {};
242 let link = AgentLink::connect(&scope, responder);
243 let upd = AgentLifecycleEvent::Create(link);
244 scope.send(upd);
245 let handler = move |data: Vec<u8>| {
246 let msg = ToWorker::<AGN::Input>::unpack(&data);
247 match msg {
248 ToWorker::Connected(id) => {
249 let upd = AgentLifecycleEvent::Connected(id);
250 scope.send(upd);
251 }
252 ToWorker::ProcessInput(id, value) => {
253 let upd = AgentLifecycleEvent::Input(value, id);
254 scope.send(upd);
255 }
256 ToWorker::Disconnected(id) => {
257 let upd = AgentLifecycleEvent::Disconnected(id);
258 scope.send(upd);
259 }
260 ToWorker::Destroy => {
261 let upd = AgentLifecycleEvent::Destroy;
262 scope.send(upd);
263 cfg_match! {
265 feature = "std_web" => js! { self.close(); },
266 feature = "web_sys" => worker_self().close(),
267 };
268 }
269 }
270 };
271 let loaded: FromWorker<AGN::Output> = FromWorker::WorkerLoaded;
272 let loaded = loaded.pack();
273 cfg_match! {
274 feature = "std_web" => js! {
275 var handler = @{handler};
276 self.onmessage = function(event) {
277 handler(event.data);
278 };
279 self.postMessage(@{loaded});
280 },
281 feature = "web_sys" => ({
282 let worker = worker_self();
283 worker.set_onmessage_closure(handler);
284 worker.post_message_vec(loaded);
285 }),
286 };
287 }
288}
289
290struct RemoteAgent<AGN>
291where
292 AGN: Agent,
293 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
294 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
295{
296 #[cfg(feature = "std_web")]
297 worker: Value,
298 #[cfg(feature = "web_sys")]
299 worker: Worker,
300 slab: SharedOutputSlab<AGN>,
301}
302
303impl<AGN> RemoteAgent<AGN>
304where
305 AGN: Agent,
306 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
307 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
308{
309 pub fn new(
310 #[cfg(feature = "std_web")] worker: Value,
311 #[cfg(feature = "web_sys")] worker: Worker,
312 slab: SharedOutputSlab<AGN>,
313 ) -> Self {
314 RemoteAgent { worker, slab }
315 }
316
317 fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> PublicBridge<AGN> {
318 let respondable = callback.is_some();
319 let mut slab = self.slab.borrow_mut();
320 let id: usize = slab.insert(callback);
321 let id = HandlerId::new(id, respondable);
322 let bridge = PublicBridge {
323 worker: self.worker.clone(),
324 id,
325 _agent: PhantomData,
326 };
327 bridge.send_message(ToWorker::Connected(bridge.id));
328
329 bridge
330 }
331
332 fn remove_bridge(&mut self, bridge: &PublicBridge<AGN>) -> Last {
333 let mut slab = self.slab.borrow_mut();
334 let _ = slab.remove(bridge.id.raw_id());
335 slab.is_empty()
336 }
337}