gloo_worker/actor/
spawner.rs1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::fmt;
4use std::marker::PhantomData;
5use std::rc::{Rc, Weak};
6
7use gloo_utils::window;
8use js_sys::Array;
9use serde::de::Deserialize;
10use serde::ser::Serialize;
11use web_sys::{Blob, BlobPropertyBag, Url};
12
13use super::bridge::{CallbackMap, WorkerBridge};
14use super::handler_id::HandlerId;
15use super::messages::FromWorker;
16use super::native_worker::{DedicatedWorker, NativeWorkerExt};
17use super::traits::Worker;
18use super::{Callback, Shared};
19use crate::codec::{Bincode, Codec};
20
21fn create_worker(path: &str) -> DedicatedWorker {
22 let js_shim_url = Url::new_with_base(
23 path,
24 &window().location().href().expect("failed to read href."),
25 )
26 .expect("failed to create url for javascript entrypoint")
27 .to_string();
28
29 let wasm_url = js_shim_url.replace(".js", "_bg.wasm");
30
31 let array = Array::new();
32 array.push(&format!(r#"importScripts("{js_shim_url}");wasm_bindgen("{wasm_url}");"#).into());
33 let blob = Blob::new_with_str_sequence_and_options(
34 &array,
35 BlobPropertyBag::new().type_("application/javascript"),
36 )
37 .unwrap();
38 let url = Url::create_object_url_with_blob(&blob).unwrap();
39
40 DedicatedWorker::new(&url).expect("failed to spawn worker")
41}
42
43#[derive(Clone)]
45pub struct WorkerSpawner<W, CODEC = Bincode>
46where
47 W: Worker,
48 CODEC: Codec,
49{
50 _marker: PhantomData<(W, CODEC)>,
51 callback: Option<Callback<W::Output>>,
52}
53
54impl<W, CODEC> fmt::Debug for WorkerSpawner<W, CODEC>
55where
56 W: Worker,
57 CODEC: Codec,
58{
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.write_str("WorkerScope<_>")
61 }
62}
63
64impl<W, CODEC> Default for WorkerSpawner<W, CODEC>
65where
66 W: Worker + 'static,
67 CODEC: Codec,
68{
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl<W, CODEC> WorkerSpawner<W, CODEC>
75where
76 W: Worker + 'static,
77 CODEC: Codec,
78{
79 pub const fn new() -> Self {
81 Self {
82 _marker: PhantomData,
83 callback: None,
84 }
85 }
86
87 pub fn encoding<C>(&mut self) -> WorkerSpawner<W, C>
89 where
90 C: Codec,
91 {
92 WorkerSpawner {
93 _marker: PhantomData,
94 callback: self.callback.clone(),
95 }
96 }
97
98 pub fn callback<F>(&mut self, cb: F) -> &mut Self
100 where
101 F: 'static + Fn(W::Output),
102 {
103 self.callback = Some(Rc::new(cb));
104
105 self
106 }
107
108 fn spawn_inner(&self, worker: DedicatedWorker) -> WorkerBridge<W>
109 where
110 W::Input: Serialize + for<'de> Deserialize<'de>,
111 W::Output: Serialize + for<'de> Deserialize<'de>,
112 {
113 let pending_queue = Rc::new(RefCell::new(Some(Vec::new())));
114 let handler_id = HandlerId::new();
115 let mut callbacks = HashMap::new();
116
117 if let Some(m) = self.callback.as_ref().map(Rc::downgrade) {
118 callbacks.insert(handler_id, m);
119 }
120
121 let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));
122
123 let handler = {
124 let pending_queue = pending_queue.clone();
125 let callbacks = callbacks.clone();
126
127 let worker = worker.clone();
128
129 move |msg: FromWorker<W>| match msg {
130 FromWorker::WorkerLoaded => {
131 if let Some(pending_queue) = pending_queue.borrow_mut().take() {
132 for to_worker in pending_queue.into_iter() {
133 worker.post_packed_message::<_, CODEC>(to_worker);
134 }
135 }
136 }
137 FromWorker::ProcessOutput(id, output) => {
138 let mut callbacks = callbacks.borrow_mut();
139
140 if let Some(m) = callbacks.get(&id) {
141 if let Some(m) = Weak::upgrade(m) {
142 m(output);
143 } else {
144 callbacks.remove(&id);
145 }
146 }
147 }
148 }
149 };
150
151 worker.set_on_packed_message::<_, CODEC, _>(handler);
152
153 WorkerBridge::<W>::new::<CODEC>(
154 handler_id,
155 worker,
156 pending_queue,
157 callbacks,
158 self.callback.clone(),
159 )
160 }
161
162 pub fn spawn(&self, path: &str) -> WorkerBridge<W>
164 where
165 W::Input: Serialize + for<'de> Deserialize<'de>,
166 W::Output: Serialize + for<'de> Deserialize<'de>,
167 {
168 let worker = create_worker(path);
169
170 self.spawn_inner(worker)
171 }
172
173 pub fn spawn_with_loader(&self, loader_path: &str) -> WorkerBridge<W>
175 where
176 W::Input: Serialize + for<'de> Deserialize<'de>,
177 W::Output: Serialize + for<'de> Deserialize<'de>,
178 {
179 let worker = DedicatedWorker::new(loader_path).expect("failed to spawn worker");
180
181 self.spawn_inner(worker)
182 }
183}