yew_stdweb/agent/worker/
public.rs

1use super::*;
2use crate::callback::Callback;
3use crate::scheduler::Shared;
4use anymap::{self, AnyMap};
5use cfg_if::cfg_if;
6use cfg_match::cfg_match;
7use queue::Queue;
8use slab::Slab;
9use std::any::TypeId;
10use std::cell::RefCell;
11use std::fmt;
12use std::marker::PhantomData;
13use std::rc::Rc;
14cfg_if! {
15    if #[cfg(feature = "std_web")] {
16        use stdweb::Value;
17        #[allow(unused_imports)]
18        use stdweb::{_js_impl, js};
19    } else if #[cfg(feature = "web_sys")] {
20        use super::WorkerExt;
21        use web_sys::{Worker};
22    }
23}
24
25thread_local! {
26    static REMOTE_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
27    static QUEUE: Queue<TypeId> = Queue::new();
28}
29
30/// Create a single instance in a tab.
31#[allow(missing_debug_implementations)]
32pub struct Public<AGN> {
33    _agent: PhantomData<AGN>,
34}
35
36impl<AGN> Discoverer for Public<AGN>
37where
38    AGN: Agent,
39    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
40    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
41{
42    type Agent = AGN;
43
44    fn spawn_or_join(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
45        let bridge = REMOTE_AGENTS_POOL.with(|pool| {
46            let mut pool = pool.borrow_mut();
47            match pool.entry::<RemoteAgent<AGN>>() {
48                anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback),
49                anymap::Entry::Vacant(entry) => {
50                    let slab: Shared<Slab<Option<Callback<AGN::Output>>>> =
51                        Rc::new(RefCell::new(Slab::new()));
52                    let handler = {
53                        let slab = slab.clone();
54                        move |data: Vec<u8>,
55                              #[cfg(feature = "std_web")] worker: Value,
56                              #[cfg(feature = "web_sys")] worker: &Worker| {
57                            let msg = FromWorker::<AGN::Output>::unpack(&data);
58                            match msg {
59                                FromWorker::WorkerLoaded => {
60                                    QUEUE.with(|queue| {
61                                        queue.insert_loaded_agent(TypeId::of::<AGN>());
62
63                                        if let Some(msgs) = queue.remove_msg_queue(&TypeId::of::<AGN>()) {
64                                            for msg in msgs {
65                                                cfg_match! {
66                                                    feature = "std_web" => ({
67                                                        let worker = &worker;
68                                                        js! {@{worker}.postMessage(@{msg});};
69                                                    }),
70                                                    feature = "web_sys" => worker.post_message_vec(msg),
71                                                }
72                                            }
73                                        }
74                                    });
75                                }
76                                FromWorker::ProcessOutput(id, output) => {
77                                    locate_callback_and_respond::<AGN>(&slab, id, output);
78                                }
79                            }
80                        }
81                    };
82                    let name_of_resource = AGN::name_of_resource();
83                    let worker = cfg_match! {
84                        feature = "std_web" => js! {
85                            var worker = new Worker(@{name_of_resource});
86                            var handler = @{handler};
87                            worker.onmessage = function(event) {
88                                handler(event.data, worker);
89                            };
90                            return worker;
91                        },
92                        feature = "web_sys" => ({
93                            let worker = worker_new(name_of_resource, AGN::is_module());
94                            let worker_clone = worker.clone();
95                            worker.set_onmessage_closure(move |data: Vec<u8>| {
96                                handler(data, &worker_clone);
97                            });
98                            worker
99                        }),
100                    };
101                    let launched = RemoteAgent::new(worker, slab);
102                    entry.insert(launched).create_bridge(callback)
103                }
104            }
105        });
106        Box::new(bridge)
107    }
108}
109
110impl<AGN> Dispatchable for Public<AGN>
111where
112    AGN: Agent,
113    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
114    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
115{
116}
117
118/// A connection manager for components interaction with workers.
119pub struct PublicBridge<AGN>
120where
121    AGN: Agent,
122    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
123    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
124{
125    #[cfg(feature = "std_web")]
126    worker: Value,
127    #[cfg(feature = "web_sys")]
128    worker: Worker,
129    id: HandlerId,
130    _agent: PhantomData<AGN>,
131}
132
133impl<AGN> fmt::Debug for PublicBridge<AGN>
134where
135    AGN: Agent,
136    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
137    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
138{
139    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140        f.write_str("PublicBridge<_>")
141    }
142}
143
144impl<AGN> PublicBridge<AGN>
145where
146    AGN: Agent,
147    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
148    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
149{
150    /// Send a message to the worker, queuing the message if necessary
151    fn send_message(&self, msg: ToWorker<AGN::Input>) {
152        QUEUE.with(|queue| {
153            if queue.is_worker_loaded(&TypeId::of::<AGN>()) {
154                send_to_remote::<AGN>(&self.worker, msg);
155            } else {
156                queue.add_msg_to_queue(msg.pack(), TypeId::of::<AGN>());
157            }
158        });
159    }
160}
161
162impl<AGN> Bridge<AGN> for PublicBridge<AGN>
163where
164    AGN: Agent,
165    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
166    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
167{
168    fn send(&mut self, msg: AGN::Input) {
169        let msg = ToWorker::ProcessInput(self.id, msg);
170        self.send_message(msg);
171    }
172}
173
174impl<AGN> Drop for PublicBridge<AGN>
175where
176    AGN: Agent,
177    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
178    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
179{
180    fn drop(&mut self) {
181        let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| {
182            let mut pool = pool.borrow_mut();
183            let terminate_worker = {
184                if let Some(launched) = pool.get_mut::<RemoteAgent<AGN>>() {
185                    launched.remove_bridge(self)
186                } else {
187                    false
188                }
189            };
190
191            if terminate_worker {
192                pool.remove::<RemoteAgent<AGN>>();
193            }
194
195            terminate_worker
196        });
197
198        let disconnected = ToWorker::Disconnected(self.id);
199        self.send_message(disconnected);
200
201        if terminate_worker {
202            let destroy = ToWorker::Destroy;
203            self.send_message(destroy);
204
205            QUEUE.with(|queue| {
206                queue.remove_agent(&TypeId::of::<AGN>());
207            });
208        }
209    }
210}
211
212struct WorkerResponder {}
213
214impl<AGN> Responder<AGN> for WorkerResponder
215where
216    AGN: Agent,
217    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
218    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
219{
220    fn respond(&self, id: HandlerId, output: AGN::Output) {
221        let msg = FromWorker::ProcessOutput(id, output);
222        let data = msg.pack();
223        cfg_match! {
224            feature = "std_web" => js! {
225                var data = @{data};
226                self.postMessage(data);
227            },
228            feature = "web_sys" => worker_self().post_message_vec(data),
229        };
230    }
231}
232
233impl<AGN> Threaded for AGN
234where
235    AGN: Agent<Reach = Public<AGN>>,
236    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
237    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
238{
239    fn register() {
240        let scope = AgentScope::<AGN>::new();
241        let responder = WorkerResponder {};
242        let link = AgentLink::connect(&scope, responder);
243        let upd = AgentLifecycleEvent::Create(link);
244        scope.send(upd);
245        let handler = move |data: Vec<u8>| {
246            let msg = ToWorker::<AGN::Input>::unpack(&data);
247            match msg {
248                ToWorker::Connected(id) => {
249                    let upd = AgentLifecycleEvent::Connected(id);
250                    scope.send(upd);
251                }
252                ToWorker::ProcessInput(id, value) => {
253                    let upd = AgentLifecycleEvent::Input(value, id);
254                    scope.send(upd);
255                }
256                ToWorker::Disconnected(id) => {
257                    let upd = AgentLifecycleEvent::Disconnected(id);
258                    scope.send(upd);
259                }
260                ToWorker::Destroy => {
261                    let upd = AgentLifecycleEvent::Destroy;
262                    scope.send(upd);
263                    // Terminates web worker
264                    cfg_match! {
265                        feature = "std_web" => js! { self.close(); },
266                        feature = "web_sys" => worker_self().close(),
267                    };
268                }
269            }
270        };
271        let loaded: FromWorker<AGN::Output> = FromWorker::WorkerLoaded;
272        let loaded = loaded.pack();
273        cfg_match! {
274            feature = "std_web" => js! {
275                    var handler = @{handler};
276                    self.onmessage = function(event) {
277                        handler(event.data);
278                    };
279                    self.postMessage(@{loaded});
280            },
281            feature = "web_sys" => ({
282                let worker = worker_self();
283                worker.set_onmessage_closure(handler);
284                worker.post_message_vec(loaded);
285            }),
286        };
287    }
288}
289
290struct RemoteAgent<AGN>
291where
292    AGN: Agent,
293    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
294    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
295{
296    #[cfg(feature = "std_web")]
297    worker: Value,
298    #[cfg(feature = "web_sys")]
299    worker: Worker,
300    slab: SharedOutputSlab<AGN>,
301}
302
303impl<AGN> RemoteAgent<AGN>
304where
305    AGN: Agent,
306    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
307    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
308{
309    pub fn new(
310        #[cfg(feature = "std_web")] worker: Value,
311        #[cfg(feature = "web_sys")] worker: Worker,
312        slab: SharedOutputSlab<AGN>,
313    ) -> Self {
314        RemoteAgent { worker, slab }
315    }
316
317    fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> PublicBridge<AGN> {
318        let respondable = callback.is_some();
319        let mut slab = self.slab.borrow_mut();
320        let id: usize = slab.insert(callback);
321        let id = HandlerId::new(id, respondable);
322        let bridge = PublicBridge {
323            worker: self.worker.clone(),
324            id,
325            _agent: PhantomData,
326        };
327        bridge.send_message(ToWorker::Connected(bridge.id));
328
329        bridge
330    }
331
332    fn remove_bridge(&mut self, bridge: &PublicBridge<AGN>) -> Last {
333        let mut slab = self.slab.borrow_mut();
334        let _ = slab.remove(bridge.id.raw_id());
335        slab.is_empty()
336    }
337}