yew_stdweb/agent/worker/
private.rs1use super::*;
2use crate::callback::Callback;
3use cfg_if::cfg_if;
4use cfg_match::cfg_match;
5use queue::Queue;
6use std::fmt;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicUsize, Ordering};
9cfg_if! {
10 if #[cfg(feature = "std_web")] {
11 use stdweb::Value;
12 #[allow(unused_imports)]
13 use stdweb::{_js_impl, js};
14 } else if #[cfg(feature = "web_sys")] {
15 use web_sys::{Worker};
16 }
17}
18
19thread_local! {
20 static QUEUE: Queue<usize> = Queue::new();
21}
22
23static PRIVATE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
24const SINGLETON_ID: HandlerId = HandlerId(0, true);
25
26#[allow(missing_debug_implementations)]
28pub struct Private<AGN> {
29 _agent: PhantomData<AGN>,
30}
31
32impl<AGN> Discoverer for Private<AGN>
33where
34 AGN: Agent,
35 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
36 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
37{
38 type Agent = AGN;
39
40 fn spawn_or_join(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
41 let id = PRIVATE_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
42 let callback = callback.expect("Callback required for Private agents");
43 let handler = move |data: Vec<u8>,
44 #[cfg(feature = "std_web")] worker: Value,
45 #[cfg(feature = "web_sys")] worker: &Worker| {
46 let msg = FromWorker::<AGN::Output>::unpack(&data);
47 match msg {
48 FromWorker::WorkerLoaded => {
49 QUEUE.with(|queue| {
50 queue.insert_loaded_agent(id);
51
52 if let Some(msgs) = queue.remove_msg_queue(&id) {
53 for msg in msgs {
54 cfg_match! {
55 feature = "std_web" => ({
56 let worker = &worker;
57 js! {@{worker}.postMessage(@{msg});};
58 }),
59 feature = "web_sys" => worker.post_message_vec(msg),
60 }
61 }
62 }
63 });
64 }
65 FromWorker::ProcessOutput(id, output) => {
66 assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
67 callback.emit(output);
68 }
69 }
70 };
71
72 let name_of_resource = AGN::name_of_resource();
74 let worker = cfg_match! {
75 feature = "std_web" => js! {
76 var worker = new Worker(@{name_of_resource});
77 var handler = @{handler};
78 worker.onmessage = function(event) {
79 handler(event.data, worker);
80 };
81 return worker;
82 },
83 feature = "web_sys" => ({
84 let worker = worker_new(name_of_resource, AGN::is_module());
85 let worker_clone = worker.clone();
86 worker.set_onmessage_closure(move |data: Vec<u8>| handler(data, &worker_clone));
87 worker
88 }),
89 };
90 let bridge = PrivateBridge {
91 worker,
92 _agent: PhantomData,
93 id,
94 };
95 bridge.send_message(ToWorker::Connected(SINGLETON_ID));
96 Box::new(bridge)
97 }
98}
99
100pub struct PrivateBridge<AGN>
102where
103 AGN: Agent,
104 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
105 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
106{
107 #[cfg(feature = "std_web")]
108 worker: Value,
109 #[cfg(feature = "web_sys")]
110 worker: Worker,
111 _agent: PhantomData<AGN>,
112 id: usize,
113}
114
115impl<AGN> PrivateBridge<AGN>
116where
117 AGN: Agent,
118 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
119 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
120{
121 fn send_message(&self, msg: ToWorker<AGN::Input>) {
123 QUEUE.with(|queue| {
124 if queue.is_worker_loaded(&self.id) {
125 send_to_remote::<AGN>(&self.worker, msg);
126 } else {
127 queue.add_msg_to_queue(msg.pack(), self.id);
128 }
129 });
130 }
131}
132impl<AGN> fmt::Debug for PrivateBridge<AGN>
133where
134 AGN: Agent,
135 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
136 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
137{
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 f.write_str("PrivateBridge<_>")
140 }
141}
142
143impl<AGN> Bridge<AGN> for PrivateBridge<AGN>
144where
145 AGN: Agent,
146 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
147 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
148{
149 fn send(&mut self, msg: AGN::Input) {
150 let msg = ToWorker::ProcessInput(SINGLETON_ID, msg);
151 self.send_message(msg);
152 }
153}
154
155impl<AGN> Drop for PrivateBridge<AGN>
156where
157 AGN: Agent,
158 <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
159 <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
160{
161 fn drop(&mut self) {
162 let disconnected = ToWorker::Disconnected(SINGLETON_ID);
163 send_to_remote::<AGN>(&self.worker, disconnected);
164
165 let destroy = ToWorker::Destroy;
166 send_to_remote::<AGN>(&self.worker, destroy);
167
168 QUEUE.with(|queue| {
169 queue.remove_agent(&self.id);
170 });
171 }
172}