yew_stdweb/agent/worker/
private.rs

1use super::*;
2use crate::callback::Callback;
3use cfg_if::cfg_if;
4use cfg_match::cfg_match;
5use queue::Queue;
6use std::fmt;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicUsize, Ordering};
9cfg_if! {
10    if #[cfg(feature = "std_web")] {
11        use stdweb::Value;
12        #[allow(unused_imports)]
13        use stdweb::{_js_impl, js};
14    } else if #[cfg(feature = "web_sys")] {
15        use web_sys::{Worker};
16    }
17}
18
19thread_local! {
20    static QUEUE: Queue<usize> = Queue::new();
21}
22
23static PRIVATE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
24const SINGLETON_ID: HandlerId = HandlerId(0, true);
25
26/// Create a new instance for every bridge.
27#[allow(missing_debug_implementations)]
28pub struct Private<AGN> {
29    _agent: PhantomData<AGN>,
30}
31
32impl<AGN> Discoverer for Private<AGN>
33where
34    AGN: Agent,
35    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
36    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
37{
38    type Agent = AGN;
39
40    fn spawn_or_join(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
41        let id = PRIVATE_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
42        let callback = callback.expect("Callback required for Private agents");
43        let handler = move |data: Vec<u8>,
44                            #[cfg(feature = "std_web")] worker: Value,
45                            #[cfg(feature = "web_sys")] worker: &Worker| {
46            let msg = FromWorker::<AGN::Output>::unpack(&data);
47            match msg {
48                FromWorker::WorkerLoaded => {
49                    QUEUE.with(|queue| {
50                        queue.insert_loaded_agent(id);
51
52                        if let Some(msgs) = queue.remove_msg_queue(&id) {
53                            for msg in msgs {
54                                cfg_match! {
55                                    feature = "std_web" => ({
56                                        let worker = &worker;
57                                        js! {@{worker}.postMessage(@{msg});};
58                                    }),
59                                    feature = "web_sys" => worker.post_message_vec(msg),
60                                }
61                            }
62                        }
63                    });
64                }
65                FromWorker::ProcessOutput(id, output) => {
66                    assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
67                    callback.emit(output);
68                }
69            }
70        };
71
72        // TODO(#947): Drop handler when bridge is dropped
73        let name_of_resource = AGN::name_of_resource();
74        let worker = cfg_match! {
75            feature = "std_web" => js! {
76                var worker = new Worker(@{name_of_resource});
77                var handler = @{handler};
78                worker.onmessage = function(event) {
79                    handler(event.data, worker);
80                };
81                return worker;
82            },
83            feature = "web_sys" => ({
84                let worker = worker_new(name_of_resource, AGN::is_module());
85                let worker_clone = worker.clone();
86                worker.set_onmessage_closure(move |data: Vec<u8>| handler(data, &worker_clone));
87                worker
88            }),
89        };
90        let bridge = PrivateBridge {
91            worker,
92            _agent: PhantomData,
93            id,
94        };
95        bridge.send_message(ToWorker::Connected(SINGLETON_ID));
96        Box::new(bridge)
97    }
98}
99
100/// A connection manager for components interaction with workers.
101pub struct PrivateBridge<AGN>
102where
103    AGN: Agent,
104    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
105    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
106{
107    #[cfg(feature = "std_web")]
108    worker: Value,
109    #[cfg(feature = "web_sys")]
110    worker: Worker,
111    _agent: PhantomData<AGN>,
112    id: usize,
113}
114
115impl<AGN> PrivateBridge<AGN>
116where
117    AGN: Agent,
118    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
119    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
120{
121    /// Send a message to the worker, queuing the message if necessary
122    fn send_message(&self, msg: ToWorker<AGN::Input>) {
123        QUEUE.with(|queue| {
124            if queue.is_worker_loaded(&self.id) {
125                send_to_remote::<AGN>(&self.worker, msg);
126            } else {
127                queue.add_msg_to_queue(msg.pack(), self.id);
128            }
129        });
130    }
131}
132impl<AGN> fmt::Debug for PrivateBridge<AGN>
133where
134    AGN: Agent,
135    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
136    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
137{
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        f.write_str("PrivateBridge<_>")
140    }
141}
142
143impl<AGN> Bridge<AGN> for PrivateBridge<AGN>
144where
145    AGN: Agent,
146    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
147    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
148{
149    fn send(&mut self, msg: AGN::Input) {
150        let msg = ToWorker::ProcessInput(SINGLETON_ID, msg);
151        self.send_message(msg);
152    }
153}
154
155impl<AGN> Drop for PrivateBridge<AGN>
156where
157    AGN: Agent,
158    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
159    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
160{
161    fn drop(&mut self) {
162        let disconnected = ToWorker::Disconnected(SINGLETON_ID);
163        send_to_remote::<AGN>(&self.worker, disconnected);
164
165        let destroy = ToWorker::Destroy;
166        send_to_remote::<AGN>(&self.worker, destroy);
167
168        QUEUE.with(|queue| {
169            queue.remove_agent(&self.id);
170        });
171    }
172}