dioxus_liveview/
pool.rs

1use crate::{
2    document::init_document,
3    element::LiveviewElement,
4    events::SerializedHtmlEventConverter,
5    query::{QueryEngine, QueryResult},
6    LiveViewError,
7};
8use dioxus_core::prelude::*;
9use dioxus_html::{EventData, HtmlEvent, PlatformEventData};
10use dioxus_interpreter_js::MutationState;
11use futures_util::{pin_mut, SinkExt, StreamExt};
12use serde::Serialize;
13use std::{any::Any, rc::Rc, time::Duration};
14use tokio_util::task::LocalPoolHandle;
15
16#[derive(Clone)]
17pub struct LiveViewPool {
18    pub(crate) pool: LocalPoolHandle,
19}
20
21impl Default for LiveViewPool {
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27impl LiveViewPool {
28    pub fn new() -> Self {
29        // Set the event converter
30        dioxus_html::set_event_converter(Box::new(SerializedHtmlEventConverter));
31
32        LiveViewPool {
33            pool: LocalPoolHandle::new(16),
34        }
35    }
36
37    pub async fn launch(
38        &self,
39        ws: impl LiveViewSocket,
40        app: fn() -> Element,
41    ) -> Result<(), LiveViewError> {
42        self.launch_with_props(ws, |app| app(), app).await
43    }
44
45    pub async fn launch_with_props<T: Clone + Send + 'static>(
46        &self,
47        ws: impl LiveViewSocket,
48        app: fn(T) -> Element,
49        props: T,
50    ) -> Result<(), LiveViewError> {
51        self.launch_virtualdom(ws, move || VirtualDom::new_with_props(app, props))
52            .await
53    }
54
55    pub async fn launch_virtualdom<F: FnOnce() -> VirtualDom + Send + 'static>(
56        &self,
57        ws: impl LiveViewSocket,
58        make_app: F,
59    ) -> Result<(), LiveViewError> {
60        match self.pool.spawn_pinned(move || run(make_app(), ws)).await {
61            Ok(Ok(_)) => Ok(()),
62            Ok(Err(e)) => Err(e),
63            Err(_) => Err(LiveViewError::SendingFailed),
64        }
65    }
66}
67
68/// A LiveViewSocket is a Sink and Stream of Strings that Dioxus uses to communicate with the client
69///
70/// Most websockets from most HTTP frameworks can be converted into a LiveViewSocket using the appropriate adapter.
71///
72/// You can also convert your own socket into a LiveViewSocket by implementing this trait. This trait is an auto trait,
73/// meaning that as long as your type implements Stream and Sink, you can use it as a LiveViewSocket.
74///
75/// For example, the axum implementation is a really small transform:
76///
77/// ```rust, ignore
78/// pub fn axum_socket(ws: WebSocket) -> impl LiveViewSocket {
79///     ws.map(transform_rx)
80///         .with(transform_tx)
81///         .sink_map_err(|_| LiveViewError::SendingFailed)
82/// }
83///
84/// fn transform_rx(message: Result<Message, axum::Error>) -> Result<String, LiveViewError> {
85///     message
86///         .map_err(|_| LiveViewError::SendingFailed)?
87///         .into_text()
88///         .map_err(|_| LiveViewError::SendingFailed)
89/// }
90///
91/// async fn transform_tx(message: String) -> Result<Message, axum::Error> {
92///     Ok(Message::Text(message))
93/// }
94/// ```
95pub trait LiveViewSocket:
96    SinkExt<Vec<u8>, Error = LiveViewError>
97    + StreamExt<Item = Result<Vec<u8>, LiveViewError>>
98    + Send
99    + 'static
100{
101}
102
103impl<S> LiveViewSocket for S where
104    S: SinkExt<Vec<u8>, Error = LiveViewError>
105        + StreamExt<Item = Result<Vec<u8>, LiveViewError>>
106        + Send
107        + 'static
108{
109}
110
111/// The primary event loop for the VirtualDom waiting for user input
112///
113/// This function makes it easy to integrate Dioxus LiveView with any socket-based framework.
114///
115/// As long as your framework can provide a Sink and Stream of Bytes, you can use this function.
116///
117/// You might need to transform the error types of the web backend into the LiveView error type.
118pub async fn run(mut vdom: VirtualDom, ws: impl LiveViewSocket) -> Result<(), LiveViewError> {
119    #[cfg(all(feature = "devtools", debug_assertions))]
120    let mut hot_reload_rx = {
121        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
122        if let Some(endpoint) = dioxus_cli_config::devserver_ws_endpoint() {
123            dioxus_devtools::connect(endpoint, move |template| _ = tx.send(template));
124        }
125        rx
126    };
127
128    let mut mutations = MutationState::default();
129
130    // Create the a proxy for query engine
131    let (query_tx, mut query_rx) = tokio::sync::mpsc::unbounded_channel();
132    let query_engine = QueryEngine::new(query_tx);
133    vdom.runtime().on_scope(ScopeId::ROOT, || {
134        provide_context(query_engine.clone());
135        init_document();
136    });
137
138    // pin the futures so we can use select!
139    pin_mut!(ws);
140
141    if let Some(edits) = {
142        vdom.rebuild(&mut mutations);
143        take_edits(&mut mutations)
144    } {
145        // send the initial render to the client
146        ws.send(edits).await?;
147    }
148
149    // desktop uses this wrapper struct thing around the actual event itself
150    // this is sorta driven by tao/wry
151    #[derive(serde::Deserialize, Debug)]
152    #[serde(tag = "method", content = "params")]
153    enum IpcMessage {
154        #[serde(rename = "user_event")]
155        Event(Box<HtmlEvent>),
156        #[serde(rename = "query")]
157        Query(QueryResult),
158    }
159
160    loop {
161        #[cfg(all(feature = "devtools", debug_assertions))]
162        let hot_reload_wait = hot_reload_rx.recv();
163        #[cfg(not(all(feature = "devtools", debug_assertions)))]
164        let hot_reload_wait: std::future::Pending<Option<()>> = std::future::pending();
165
166        tokio::select! {
167            // poll any futures or suspense
168            _ = vdom.wait_for_work() => {}
169
170            evt = ws.next() => {
171                match evt.as_ref().map(|o| o.as_deref()) {
172                    // respond with a pong every ping to keep the websocket alive
173                    Some(Ok(b"__ping__")) => {
174                        ws.send(text_frame("__pong__")).await?;
175                    }
176                    Some(Ok(evt)) => {
177                        if let Ok(message) = serde_json::from_str::<IpcMessage>(&String::from_utf8_lossy(evt)) {
178                            match message {
179                                IpcMessage::Event(evt) => {
180                                    // Intercept the mounted event and insert a custom element type
181                                    let event = if let EventData::Mounted = &evt.data {
182                                        let element = LiveviewElement::new(evt.element, query_engine.clone());
183                                        Event::new(
184                                            Rc::new(PlatformEventData::new(Box::new(element))) as Rc<dyn Any>,
185                                            evt.bubbles,
186                                        )
187                                    } else {
188                                        Event::new(
189                                            evt.data.into_any(),
190                                            evt.bubbles,
191                                        )
192                                    };
193                                    vdom.runtime().handle_event(
194                                        &evt.name,
195                                        event,
196                                        evt.element,
197                                    );
198                                }
199                                IpcMessage::Query(result) => {
200                                    query_engine.send(result);
201                                },
202                            }
203                        }
204                    }
205                    // log this I guess? when would we get an error here?
206                    Some(Err(_e)) => {}
207                    None => return Ok(()),
208                }
209            }
210
211            // handle any new queries
212            Some(query) = query_rx.recv() => {
213                ws.send(text_frame(&serde_json::to_string(&ClientUpdate::Query(query)).unwrap())).await?;
214            }
215
216            Some(msg) = hot_reload_wait => {
217                #[cfg(all(feature = "devtools", debug_assertions))]
218                match msg{
219                    dioxus_devtools::DevserverMsg::HotReload(msg)=> {
220                        dioxus_devtools::apply_changes(&vdom, &msg);
221                    }
222                    dioxus_devtools::DevserverMsg::Shutdown => {
223                        std::process::exit(0);
224                    },
225                    dioxus_devtools::DevserverMsg::FullReloadCommand
226                    | dioxus_devtools::DevserverMsg::FullReloadStart
227                    | dioxus_devtools::DevserverMsg::FullReloadFailed => {
228                        // usually only web gets this message - what are we supposed to do?
229                        // Maybe we could just binary patch ourselves in place without losing window state?
230                    },
231                }
232                #[cfg(not(all(feature = "devtools", debug_assertions)))]
233                let () = msg;
234            }
235        }
236
237        // wait for suspense to resolve in a 10ms window
238        tokio::select! {
239            _ = tokio::time::sleep(Duration::from_millis(10)) => {}
240            _ = vdom.wait_for_suspense() => {}
241        }
242
243        // render the vdom
244        vdom.render_immediate(&mut mutations);
245
246        if let Some(edits) = take_edits(&mut mutations) {
247            ws.send(edits).await?;
248        }
249    }
250}
251
252fn text_frame(text: &str) -> Vec<u8> {
253    let mut bytes = vec![0];
254    bytes.extend(text.as_bytes());
255    bytes
256}
257
258fn take_edits(mutations: &mut MutationState) -> Option<Vec<u8>> {
259    // Add an extra one at the beginning to tell the shim this is a binary frame
260    let mut bytes = vec![1];
261    mutations.write_memory_into(&mut bytes);
262    (bytes.len() > 1).then_some(bytes)
263}
264
265#[derive(Serialize)]
266#[serde(tag = "type", content = "data")]
267enum ClientUpdate {
268    #[serde(rename = "query")]
269    Query(String),
270}