gloo_worker/actor/
spawner.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::fmt;
4use std::marker::PhantomData;
5use std::rc::{Rc, Weak};
6
7use gloo_utils::window;
8use js_sys::Array;
9use serde::de::Deserialize;
10use serde::ser::Serialize;
11use web_sys::{Blob, BlobPropertyBag, Url};
12
13use super::bridge::{CallbackMap, WorkerBridge};
14use super::handler_id::HandlerId;
15use super::messages::FromWorker;
16use super::native_worker::{DedicatedWorker, NativeWorkerExt};
17use super::traits::Worker;
18use super::{Callback, Shared};
19use crate::codec::{Bincode, Codec};
20
21fn create_worker(path: &str) -> DedicatedWorker {
22    let js_shim_url = Url::new_with_base(
23        path,
24        &window().location().href().expect("failed to read href."),
25    )
26    .expect("failed to create url for javascript entrypoint")
27    .to_string();
28
29    let wasm_url = js_shim_url.replace(".js", "_bg.wasm");
30
31    let array = Array::new();
32    array.push(&format!(r#"importScripts("{js_shim_url}");wasm_bindgen("{wasm_url}");"#).into());
33    let blob = Blob::new_with_str_sequence_and_options(
34        &array,
35        BlobPropertyBag::new().type_("application/javascript"),
36    )
37    .unwrap();
38    let url = Url::create_object_url_with_blob(&blob).unwrap();
39
40    DedicatedWorker::new(&url).expect("failed to spawn worker")
41}
42
43/// A spawner to create workers.
44#[derive(Clone)]
45pub struct WorkerSpawner<W, CODEC = Bincode>
46where
47    W: Worker,
48    CODEC: Codec,
49{
50    _marker: PhantomData<(W, CODEC)>,
51    callback: Option<Callback<W::Output>>,
52}
53
54impl<W, CODEC> fmt::Debug for WorkerSpawner<W, CODEC>
55where
56    W: Worker,
57    CODEC: Codec,
58{
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.write_str("WorkerScope<_>")
61    }
62}
63
64impl<W, CODEC> Default for WorkerSpawner<W, CODEC>
65where
66    W: Worker + 'static,
67    CODEC: Codec,
68{
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl<W, CODEC> WorkerSpawner<W, CODEC>
75where
76    W: Worker + 'static,
77    CODEC: Codec,
78{
79    /// Creates a [WorkerSpawner].
80    pub const fn new() -> Self {
81        Self {
82            _marker: PhantomData,
83            callback: None,
84        }
85    }
86
87    /// Sets a new message encoding.
88    pub fn encoding<C>(&mut self) -> WorkerSpawner<W, C>
89    where
90        C: Codec,
91    {
92        WorkerSpawner {
93            _marker: PhantomData,
94            callback: self.callback.clone(),
95        }
96    }
97
98    /// Sets a callback.
99    pub fn callback<F>(&mut self, cb: F) -> &mut Self
100    where
101        F: 'static + Fn(W::Output),
102    {
103        self.callback = Some(Rc::new(cb));
104
105        self
106    }
107
108    fn spawn_inner(&self, worker: DedicatedWorker) -> WorkerBridge<W>
109    where
110        W::Input: Serialize + for<'de> Deserialize<'de>,
111        W::Output: Serialize + for<'de> Deserialize<'de>,
112    {
113        let pending_queue = Rc::new(RefCell::new(Some(Vec::new())));
114        let handler_id = HandlerId::new();
115        let mut callbacks = HashMap::new();
116
117        if let Some(m) = self.callback.as_ref().map(Rc::downgrade) {
118            callbacks.insert(handler_id, m);
119        }
120
121        let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));
122
123        let handler = {
124            let pending_queue = pending_queue.clone();
125            let callbacks = callbacks.clone();
126
127            let worker = worker.clone();
128
129            move |msg: FromWorker<W>| match msg {
130                FromWorker::WorkerLoaded => {
131                    if let Some(pending_queue) = pending_queue.borrow_mut().take() {
132                        for to_worker in pending_queue.into_iter() {
133                            worker.post_packed_message::<_, CODEC>(to_worker);
134                        }
135                    }
136                }
137                FromWorker::ProcessOutput(id, output) => {
138                    let mut callbacks = callbacks.borrow_mut();
139
140                    if let Some(m) = callbacks.get(&id) {
141                        if let Some(m) = Weak::upgrade(m) {
142                            m(output);
143                        } else {
144                            callbacks.remove(&id);
145                        }
146                    }
147                }
148            }
149        };
150
151        worker.set_on_packed_message::<_, CODEC, _>(handler);
152
153        WorkerBridge::<W>::new::<CODEC>(
154            handler_id,
155            worker,
156            pending_queue,
157            callbacks,
158            self.callback.clone(),
159        )
160    }
161
162    /// Spawns a Worker.
163    pub fn spawn(&self, path: &str) -> WorkerBridge<W>
164    where
165        W::Input: Serialize + for<'de> Deserialize<'de>,
166        W::Output: Serialize + for<'de> Deserialize<'de>,
167    {
168        let worker = create_worker(path);
169
170        self.spawn_inner(worker)
171    }
172
173    /// Spawns a Worker with a loader shim script.
174    pub fn spawn_with_loader(&self, loader_path: &str) -> WorkerBridge<W>
175    where
176        W::Input: Serialize + for<'de> Deserialize<'de>,
177        W::Output: Serialize + for<'de> Deserialize<'de>,
178    {
179        let worker = DedicatedWorker::new(loader_path).expect("failed to spawn worker");
180
181        self.spawn_inner(worker)
182    }
183}