yew_stdweb/agent/worker/
mod.rs

1mod private;
2mod public;
3mod queue;
4
5pub use private::Private;
6pub use public::Public;
7
8use super::*;
9use cfg_if::cfg_if;
10use cfg_match::cfg_match;
11use serde::{Deserialize, Serialize};
12cfg_if! {
13    if #[cfg(feature = "std_web")] {
14        use stdweb::Value;
15        #[allow(unused_imports)]
16        use stdweb::{_js_impl, js};
17    } else if #[cfg(feature = "web_sys")] {
18        use crate::utils;
19        use js_sys::{Array, Reflect, Uint8Array};
20        use wasm_bindgen::{closure::Closure, JsCast, JsValue};
21        use web_sys::{Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions};
22    }
23}
24
25/// Implements rules to register a worker in a separate thread.
26pub trait Threaded {
27    /// Executes an agent in the current environment.
28    /// Uses in `main` function of a worker.
29    fn register();
30}
31
32/// Message packager, based on serde::Serialize/Deserialize
33pub trait Packed {
34    /// Pack serializable message into Vec<u8>
35    fn pack(&self) -> Vec<u8>;
36    /// Unpack deserializable message of byte slice
37    fn unpack(data: &[u8]) -> Self;
38}
39
40impl<T: Serialize + for<'de> Deserialize<'de>> Packed for T {
41    fn pack(&self) -> Vec<u8> {
42        bincode::serialize(&self).expect("can't serialize an agent message")
43    }
44
45    fn unpack(data: &[u8]) -> Self {
46        bincode::deserialize(&data).expect("can't deserialize an agent message")
47    }
48}
49
50/// Serializable messages to worker
51#[derive(Serialize, Deserialize, Debug)]
52enum ToWorker<T> {
53    /// Client is connected
54    Connected(HandlerId),
55    /// Incoming message to Worker
56    ProcessInput(HandlerId, T),
57    /// Client is disconnected
58    Disconnected(HandlerId),
59    /// Worker should be terminated
60    Destroy,
61}
62
63/// Serializable messages sent by worker to consumer
64#[derive(Serialize, Deserialize, Debug)]
65enum FromWorker<T> {
66    /// Worker sends this message when `wasm` bundle has loaded.
67    WorkerLoaded,
68    /// Outgoing message to consumer
69    ProcessOutput(HandlerId, T),
70}
71
72fn send_to_remote<AGN>(
73    #[cfg(feature = "std_web")] worker: &Value,
74    #[cfg(feature = "web_sys")] worker: &Worker,
75    msg: ToWorker<AGN::Input>,
76) where
77    AGN: Agent,
78    <AGN as Agent>::Input: Serialize + for<'de> Deserialize<'de>,
79    <AGN as Agent>::Output: Serialize + for<'de> Deserialize<'de>,
80{
81    let msg = msg.pack();
82    cfg_match! {
83        feature = "std_web" => js! {
84            var worker = @{worker};
85            var bytes = @{msg};
86            worker.postMessage(bytes);
87        },
88        feature = "web_sys" => worker.post_message_vec(msg),
89    };
90}
91
92#[cfg(feature = "web_sys")]
93fn worker_new(name_of_resource: &str, is_module: bool) -> Worker {
94    let origin = utils::origin().unwrap();
95    let script_url = format!("{}/{}", origin, name_of_resource);
96    let wasm_url = format!("{}/{}", origin, name_of_resource.replace(".js", "_bg.wasm"));
97    let array = Array::new();
98    array.push(
99        &format!(
100            r#"importScripts("{}");wasm_bindgen("{}");"#,
101            script_url, wasm_url
102        )
103        .into(),
104    );
105    let blob = Blob::new_with_str_sequence_and_options(
106        &array,
107        BlobPropertyBag::new().type_("application/javascript"),
108    )
109    .unwrap();
110    let url = Url::create_object_url_with_blob(&blob).unwrap();
111
112    if is_module {
113        let options = WorkerOptions::new();
114        Reflect::set(
115            options.as_ref(),
116            &JsValue::from_str("type"),
117            &JsValue::from_str("module"),
118        )
119        .unwrap();
120        Worker::new_with_options(&url, &options).expect("failed to spawn worker")
121    } else {
122        Worker::new(&url).expect("failed to spawn worker")
123    }
124}
125
126#[cfg(feature = "web_sys")]
127fn worker_self() -> DedicatedWorkerGlobalScope {
128    JsValue::from(js_sys::global()).into()
129}
130
131#[cfg(feature = "web_sys")]
132trait WorkerExt {
133    fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>));
134
135    fn post_message_vec(&self, data: Vec<u8>);
136}
137
138#[cfg(feature = "web_sys")]
139macro_rules! worker_ext_impl {
140    ($($type:ident),+) => {$(
141        impl WorkerExt for $type {
142            fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>)) {
143                let handler = move |message: MessageEvent| {
144                    let data = Uint8Array::from(message.data()).to_vec();
145                    handler(data);
146                };
147                let closure = Closure::wrap(Box::new(handler) as Box<dyn Fn(MessageEvent)>);
148                self.set_onmessage(Some(closure.as_ref().unchecked_ref()));
149                closure.forget();
150            }
151
152            fn post_message_vec(&self, data: Vec<u8>) {
153                self.post_message(&Uint8Array::from(data.as_slice()))
154                    .expect("failed to post message");
155            }
156        }
157    )+};
158}
159
160#[cfg(feature = "web_sys")]
161worker_ext_impl! {
162    Worker, DedicatedWorkerGlobalScope
163}