1use crate::{
2 document::init_document,
3 element::LiveviewElement,
4 events::SerializedHtmlEventConverter,
5 query::{QueryEngine, QueryResult},
6 LiveViewError,
7};
8use dioxus_core::prelude::*;
9use dioxus_html::{EventData, HtmlEvent, PlatformEventData};
10use dioxus_interpreter_js::MutationState;
11use futures_util::{pin_mut, SinkExt, StreamExt};
12use serde::Serialize;
13use std::{any::Any, rc::Rc, time::Duration};
14use tokio_util::task::LocalPoolHandle;
15
16#[derive(Clone)]
17pub struct LiveViewPool {
18 pub(crate) pool: LocalPoolHandle,
19}
20
21impl Default for LiveViewPool {
22 fn default() -> Self {
23 Self::new()
24 }
25}
26
27impl LiveViewPool {
28 pub fn new() -> Self {
29 dioxus_html::set_event_converter(Box::new(SerializedHtmlEventConverter));
31
32 LiveViewPool {
33 pool: LocalPoolHandle::new(16),
34 }
35 }
36
37 pub async fn launch(
38 &self,
39 ws: impl LiveViewSocket,
40 app: fn() -> Element,
41 ) -> Result<(), LiveViewError> {
42 self.launch_with_props(ws, |app| app(), app).await
43 }
44
45 pub async fn launch_with_props<T: Clone + Send + 'static>(
46 &self,
47 ws: impl LiveViewSocket,
48 app: fn(T) -> Element,
49 props: T,
50 ) -> Result<(), LiveViewError> {
51 self.launch_virtualdom(ws, move || VirtualDom::new_with_props(app, props))
52 .await
53 }
54
55 pub async fn launch_virtualdom<F: FnOnce() -> VirtualDom + Send + 'static>(
56 &self,
57 ws: impl LiveViewSocket,
58 make_app: F,
59 ) -> Result<(), LiveViewError> {
60 match self.pool.spawn_pinned(move || run(make_app(), ws)).await {
61 Ok(Ok(_)) => Ok(()),
62 Ok(Err(e)) => Err(e),
63 Err(_) => Err(LiveViewError::SendingFailed),
64 }
65 }
66}
67
68pub trait LiveViewSocket:
96 SinkExt<Vec<u8>, Error = LiveViewError>
97 + StreamExt<Item = Result<Vec<u8>, LiveViewError>>
98 + Send
99 + 'static
100{
101}
102
103impl<S> LiveViewSocket for S where
104 S: SinkExt<Vec<u8>, Error = LiveViewError>
105 + StreamExt<Item = Result<Vec<u8>, LiveViewError>>
106 + Send
107 + 'static
108{
109}
110
111pub async fn run(mut vdom: VirtualDom, ws: impl LiveViewSocket) -> Result<(), LiveViewError> {
119 #[cfg(all(feature = "devtools", debug_assertions))]
120 let mut hot_reload_rx = {
121 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
122 if let Some(endpoint) = dioxus_cli_config::devserver_ws_endpoint() {
123 dioxus_devtools::connect(endpoint, move |template| _ = tx.send(template));
124 }
125 rx
126 };
127
128 let mut mutations = MutationState::default();
129
130 let (query_tx, mut query_rx) = tokio::sync::mpsc::unbounded_channel();
132 let query_engine = QueryEngine::new(query_tx);
133 vdom.runtime().on_scope(ScopeId::ROOT, || {
134 provide_context(query_engine.clone());
135 init_document();
136 });
137
138 pin_mut!(ws);
140
141 if let Some(edits) = {
142 vdom.rebuild(&mut mutations);
143 take_edits(&mut mutations)
144 } {
145 ws.send(edits).await?;
147 }
148
149 #[derive(serde::Deserialize, Debug)]
152 #[serde(tag = "method", content = "params")]
153 enum IpcMessage {
154 #[serde(rename = "user_event")]
155 Event(Box<HtmlEvent>),
156 #[serde(rename = "query")]
157 Query(QueryResult),
158 }
159
160 loop {
161 #[cfg(all(feature = "devtools", debug_assertions))]
162 let hot_reload_wait = hot_reload_rx.recv();
163 #[cfg(not(all(feature = "devtools", debug_assertions)))]
164 let hot_reload_wait: std::future::Pending<Option<()>> = std::future::pending();
165
166 tokio::select! {
167 _ = vdom.wait_for_work() => {}
169
170 evt = ws.next() => {
171 match evt.as_ref().map(|o| o.as_deref()) {
172 Some(Ok(b"__ping__")) => {
174 ws.send(text_frame("__pong__")).await?;
175 }
176 Some(Ok(evt)) => {
177 if let Ok(message) = serde_json::from_str::<IpcMessage>(&String::from_utf8_lossy(evt)) {
178 match message {
179 IpcMessage::Event(evt) => {
180 let event = if let EventData::Mounted = &evt.data {
182 let element = LiveviewElement::new(evt.element, query_engine.clone());
183 Event::new(
184 Rc::new(PlatformEventData::new(Box::new(element))) as Rc<dyn Any>,
185 evt.bubbles,
186 )
187 } else {
188 Event::new(
189 evt.data.into_any(),
190 evt.bubbles,
191 )
192 };
193 vdom.runtime().handle_event(
194 &evt.name,
195 event,
196 evt.element,
197 );
198 }
199 IpcMessage::Query(result) => {
200 query_engine.send(result);
201 },
202 }
203 }
204 }
205 Some(Err(_e)) => {}
207 None => return Ok(()),
208 }
209 }
210
211 Some(query) = query_rx.recv() => {
213 ws.send(text_frame(&serde_json::to_string(&ClientUpdate::Query(query)).unwrap())).await?;
214 }
215
216 Some(msg) = hot_reload_wait => {
217 #[cfg(all(feature = "devtools", debug_assertions))]
218 match msg{
219 dioxus_devtools::DevserverMsg::HotReload(msg)=> {
220 dioxus_devtools::apply_changes(&vdom, &msg);
221 }
222 dioxus_devtools::DevserverMsg::Shutdown => {
223 std::process::exit(0);
224 },
225 dioxus_devtools::DevserverMsg::FullReloadCommand
226 | dioxus_devtools::DevserverMsg::FullReloadStart
227 | dioxus_devtools::DevserverMsg::FullReloadFailed => {
228 },
231 }
232 #[cfg(not(all(feature = "devtools", debug_assertions)))]
233 let () = msg;
234 }
235 }
236
237 tokio::select! {
239 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
240 _ = vdom.wait_for_suspense() => {}
241 }
242
243 vdom.render_immediate(&mut mutations);
245
246 if let Some(edits) = take_edits(&mut mutations) {
247 ws.send(edits).await?;
248 }
249 }
250}
251
252fn text_frame(text: &str) -> Vec<u8> {
253 let mut bytes = vec![0];
254 bytes.extend(text.as_bytes());
255 bytes
256}
257
258fn take_edits(mutations: &mut MutationState) -> Option<Vec<u8>> {
259 let mut bytes = vec![1];
261 mutations.write_memory_into(&mut bytes);
262 (bytes.len() > 1).then_some(bytes)
263}
264
265#[derive(Serialize)]
266#[serde(tag = "type", content = "data")]
267enum ClientUpdate {
268 #[serde(rename = "query")]
269 Query(String),
270}