gloo_worker/actor/
bridge.rs1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::fmt;
4use std::marker::PhantomData;
5use std::rc::Rc;
6use std::rc::Weak;
7
8use serde::{Deserialize, Serialize};
9
10use super::handler_id::HandlerId;
11use super::messages::ToWorker;
12use super::native_worker::NativeWorkerExt;
13use super::traits::Worker;
14use super::{Callback, Shared};
15use crate::codec::Codec;
16
17pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>;
18pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>;
19
20struct WorkerBridgeInner<W>
21where
22 W: Worker,
23{
24 pending_queue: Shared<Option<ToWorkerQueue<W>>>,
26 callbacks: Shared<CallbackMap<W>>,
27 post_msg: Rc<dyn Fn(ToWorker<W>)>,
28}
29
30impl<W> fmt::Debug for WorkerBridgeInner<W>
31where
32 W: Worker,
33{
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.write_str("WorkerBridgeInner<_>")
36 }
37}
38
39impl<W> WorkerBridgeInner<W>
40where
41 W: Worker,
42{
43 fn send_message(&self, msg: ToWorker<W>) {
45 let mut pending_queue = self.pending_queue.borrow_mut();
46
47 match pending_queue.as_mut() {
48 Some(m) => {
49 m.push(msg);
50 }
51 None => {
52 (self.post_msg)(msg);
53 }
54 }
55 }
56}
57
58impl<W> Drop for WorkerBridgeInner<W>
59where
60 W: Worker,
61{
62 fn drop(&mut self) {
63 let destroy = ToWorker::Destroy;
64 self.send_message(destroy);
65 }
66}
67
68pub struct WorkerBridge<W>
70where
71 W: Worker,
72{
73 inner: Rc<WorkerBridgeInner<W>>,
74 id: HandlerId,
75 _worker: PhantomData<W>,
76 _cb: Option<Rc<dyn Fn(W::Output)>>,
77}
78
79impl<W> WorkerBridge<W>
80where
81 W: Worker,
82{
83 fn init(&self) {
84 self.inner.send_message(ToWorker::Connected(self.id));
85 }
86
87 pub(crate) fn new<CODEC>(
88 id: HandlerId,
89 native_worker: web_sys::Worker,
90 pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>,
91 callbacks: Rc<RefCell<CallbackMap<W>>>,
92 callback: Option<Callback<W::Output>>,
93 ) -> Self
94 where
95 CODEC: Codec,
96 W::Input: Serialize + for<'de> Deserialize<'de>,
97 {
98 let post_msg = move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg);
99
100 let self_ = Self {
101 inner: WorkerBridgeInner {
102 pending_queue,
103 callbacks,
104 post_msg: Rc::new(post_msg),
105 }
106 .into(),
107 id,
108 _worker: PhantomData,
109 _cb: callback,
110 };
111 self_.init();
112
113 self_
114 }
115
116 pub fn send(&self, msg: W::Input) {
118 let msg = ToWorker::ProcessInput(self.id, msg);
119 self.inner.send_message(msg);
120 }
121
122 pub fn fork<F>(&self, cb: Option<F>) -> Self
126 where
127 F: 'static + Fn(W::Output),
128 {
129 let cb = cb.map(|m| Rc::new(m) as Rc<dyn Fn(W::Output)>);
130 let handler_id = HandlerId::new();
131
132 if let Some(cb_weak) = cb.as_ref().map(Rc::downgrade) {
133 self.inner
134 .callbacks
135 .borrow_mut()
136 .insert(handler_id, cb_weak);
137 }
138
139 let self_ = Self {
140 inner: self.inner.clone(),
141 id: handler_id,
142 _worker: PhantomData,
143 _cb: cb,
144 };
145 self_.init();
146
147 self_
148 }
149}
150
151impl<W> Drop for WorkerBridge<W>
152where
153 W: Worker,
154{
155 fn drop(&mut self) {
156 let disconnected = ToWorker::Disconnected(self.id);
157 self.inner.send_message(disconnected);
158 }
159}
160
161impl<W> fmt::Debug for WorkerBridge<W>
162where
163 W: Worker,
164{
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 f.write_str("WorkerBridge<_>")
167 }
168}
169
170impl<W> PartialEq for WorkerBridge<W>
171where
172 W: Worker,
173{
174 fn eq(&self, rhs: &Self) -> bool {
175 self.id == rhs.id
176 }
177}