gloo_worker/actor/
bridge.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::fmt;
4use std::marker::PhantomData;
5use std::rc::Rc;
6use std::rc::Weak;
7
8use serde::{Deserialize, Serialize};
9
10use super::handler_id::HandlerId;
11use super::messages::ToWorker;
12use super::native_worker::NativeWorkerExt;
13use super::traits::Worker;
14use super::{Callback, Shared};
15use crate::codec::Codec;
16
17pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>;
18pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>;
19
20struct WorkerBridgeInner<W>
21where
22    W: Worker,
23{
24    // When worker is loaded, queue becomes None.
25    pending_queue: Shared<Option<ToWorkerQueue<W>>>,
26    callbacks: Shared<CallbackMap<W>>,
27    post_msg: Rc<dyn Fn(ToWorker<W>)>,
28}
29
30impl<W> fmt::Debug for WorkerBridgeInner<W>
31where
32    W: Worker,
33{
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.write_str("WorkerBridgeInner<_>")
36    }
37}
38
39impl<W> WorkerBridgeInner<W>
40where
41    W: Worker,
42{
43    /// Send a message to the worker, queuing the message if necessary
44    fn send_message(&self, msg: ToWorker<W>) {
45        let mut pending_queue = self.pending_queue.borrow_mut();
46
47        match pending_queue.as_mut() {
48            Some(m) => {
49                m.push(msg);
50            }
51            None => {
52                (self.post_msg)(msg);
53            }
54        }
55    }
56}
57
58impl<W> Drop for WorkerBridgeInner<W>
59where
60    W: Worker,
61{
62    fn drop(&mut self) {
63        let destroy = ToWorker::Destroy;
64        self.send_message(destroy);
65    }
66}
67
68/// A connection manager for components interaction with workers.
69pub struct WorkerBridge<W>
70where
71    W: Worker,
72{
73    inner: Rc<WorkerBridgeInner<W>>,
74    id: HandlerId,
75    _worker: PhantomData<W>,
76    _cb: Option<Rc<dyn Fn(W::Output)>>,
77}
78
79impl<W> WorkerBridge<W>
80where
81    W: Worker,
82{
83    fn init(&self) {
84        self.inner.send_message(ToWorker::Connected(self.id));
85    }
86
87    pub(crate) fn new<CODEC>(
88        id: HandlerId,
89        native_worker: web_sys::Worker,
90        pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>,
91        callbacks: Rc<RefCell<CallbackMap<W>>>,
92        callback: Option<Callback<W::Output>>,
93    ) -> Self
94    where
95        CODEC: Codec,
96        W::Input: Serialize + for<'de> Deserialize<'de>,
97    {
98        let post_msg = move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg);
99
100        let self_ = Self {
101            inner: WorkerBridgeInner {
102                pending_queue,
103                callbacks,
104                post_msg: Rc::new(post_msg),
105            }
106            .into(),
107            id,
108            _worker: PhantomData,
109            _cb: callback,
110        };
111        self_.init();
112
113        self_
114    }
115
116    /// Send a message to the current worker.
117    pub fn send(&self, msg: W::Input) {
118        let msg = ToWorker::ProcessInput(self.id, msg);
119        self.inner.send_message(msg);
120    }
121
122    /// Forks the bridge with a different callback.
123    ///
124    /// This creates a new [HandlerID] that helps the worker to differentiate bridges.
125    pub fn fork<F>(&self, cb: Option<F>) -> Self
126    where
127        F: 'static + Fn(W::Output),
128    {
129        let cb = cb.map(|m| Rc::new(m) as Rc<dyn Fn(W::Output)>);
130        let handler_id = HandlerId::new();
131
132        if let Some(cb_weak) = cb.as_ref().map(Rc::downgrade) {
133            self.inner
134                .callbacks
135                .borrow_mut()
136                .insert(handler_id, cb_weak);
137        }
138
139        let self_ = Self {
140            inner: self.inner.clone(),
141            id: handler_id,
142            _worker: PhantomData,
143            _cb: cb,
144        };
145        self_.init();
146
147        self_
148    }
149}
150
151impl<W> Drop for WorkerBridge<W>
152where
153    W: Worker,
154{
155    fn drop(&mut self) {
156        let disconnected = ToWorker::Disconnected(self.id);
157        self.inner.send_message(disconnected);
158    }
159}
160
161impl<W> fmt::Debug for WorkerBridge<W>
162where
163    W: Worker,
164{
165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166        f.write_str("WorkerBridge<_>")
167    }
168}
169
170impl<W> PartialEq for WorkerBridge<W>
171where
172    W: Worker,
173{
174    fn eq(&self, rhs: &Self) -> bool {
175        self.id == rhs.id
176    }
177}