1use crate::core::ConnectionReadyState;
2use crate::{js, sendwrap_fn, use_event_listener, ReconnectLimit};
3use codee::Decoder;
4use default_struct_builder::DefaultBuilder;
5use leptos::prelude::*;
6use std::marker::PhantomData;
7use std::sync::atomic::{AtomicBool, AtomicU32};
8use std::sync::Arc;
9use std::time::Duration;
10use thiserror::Error;
11
12pub fn use_event_source<T, C>(
118 url: &str,
119) -> UseEventSourceReturn<
120 T,
121 C::Error,
122 impl Fn() + Clone + Send + Sync + 'static,
123 impl Fn() + Clone + Send + Sync + 'static,
124>
125where
126 T: Clone + PartialEq + Send + Sync + 'static,
127 C: Decoder<T, Encoded = str>,
128 C::Error: Send + Sync,
129{
130 use_event_source_with_options::<T, C>(url, UseEventSourceOptions::<T>::default())
131}
132
133pub fn use_event_source_with_options<T, C>(
135 url: &str,
136 options: UseEventSourceOptions<T>,
137) -> UseEventSourceReturn<
138 T,
139 C::Error,
140 impl Fn() + Clone + Send + Sync + 'static,
141 impl Fn() + Clone + Send + Sync + 'static,
142>
143where
144 T: Clone + PartialEq + Send + Sync + 'static,
145 C: Decoder<T, Encoded = str>,
146 C::Error: Send + Sync,
147{
148 let UseEventSourceOptions {
149 reconnect_limit,
150 reconnect_interval,
151 on_failed,
152 immediate,
153 named_events,
154 with_credentials,
155 _marker,
156 } = options;
157
158 let url = url.to_owned();
159
160 let (event, set_event) = signal_local(None::<web_sys::Event>);
161 let (data, set_data) = signal(None::<T>);
162 let (ready_state, set_ready_state) = signal(ConnectionReadyState::Closed);
163 let (event_source, set_event_source) = signal_local(None::<web_sys::EventSource>);
164 let (error, set_error) = signal_local(None::<UseEventSourceError<C::Error>>);
165
166 let explicitly_closed = Arc::new(AtomicBool::new(false));
167 let retried = Arc::new(AtomicU32::new(0));
168
169 let set_data_from_string = move |data_string: Option<String>| {
170 if let Some(data_string) = data_string {
171 match C::decode(&data_string) {
172 Ok(data) => set_data.set(Some(data)),
173 Err(err) => set_error.set(Some(UseEventSourceError::Deserialize(err))),
174 }
175 }
176 };
177
178 let close = {
179 let explicitly_closed = Arc::clone(&explicitly_closed);
180
181 sendwrap_fn!(move || {
182 if let Some(event_source) = event_source.get_untracked() {
183 event_source.close();
184 set_event_source.set(None);
185 set_ready_state.set(ConnectionReadyState::Closed);
186 explicitly_closed.store(true, std::sync::atomic::Ordering::Relaxed);
187 }
188 })
189 };
190
191 let init = StoredValue::new(None::<Arc<dyn Fn() + Send + Sync>>);
192
193 init.set_value(Some(Arc::new({
194 let explicitly_closed = Arc::clone(&explicitly_closed);
195 let retried = Arc::clone(&retried);
196
197 move || {
198 use wasm_bindgen::prelude::*;
199
200 if explicitly_closed.load(std::sync::atomic::Ordering::Relaxed) {
201 return;
202 }
203
204 let event_src_opts = web_sys::EventSourceInit::new();
205 event_src_opts.set_with_credentials(with_credentials);
206
207 let es = web_sys::EventSource::new_with_event_source_init_dict(&url, &event_src_opts)
208 .unwrap_throw();
209
210 set_ready_state.set(ConnectionReadyState::Connecting);
211
212 set_event_source.set(Some(es.clone()));
213
214 let on_open = Closure::wrap(Box::new(move |_: web_sys::Event| {
215 set_ready_state.set(ConnectionReadyState::Open);
216 set_error.set(None);
217 }) as Box<dyn FnMut(web_sys::Event)>);
218 es.set_onopen(Some(on_open.as_ref().unchecked_ref()));
219 on_open.forget();
220
221 let on_error = Closure::wrap(Box::new({
222 let explicitly_closed = Arc::clone(&explicitly_closed);
223 let retried = Arc::clone(&retried);
224 let on_failed = Arc::clone(&on_failed);
225 let es = es.clone();
226
227 move |e: web_sys::Event| {
228 set_ready_state.set(ConnectionReadyState::Closed);
229 set_error.set(Some(UseEventSourceError::Event(e)));
230
231 if es.ready_state() == 2
234 && !explicitly_closed.load(std::sync::atomic::Ordering::Relaxed)
235 && matches!(reconnect_limit, ReconnectLimit::Limited(_))
236 {
237 es.close();
238
239 let retried_value =
240 retried.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
241
242 if reconnect_limit.is_exceeded_by(retried_value as u64) {
243 set_timeout(
244 move || {
245 if let Some(init) = init.get_value() {
246 init();
247 }
248 },
249 Duration::from_millis(reconnect_interval),
250 );
251 } else {
252 #[cfg(debug_assertions)]
253 let _z = leptos::reactive::diagnostics::SpecialNonReactiveZone::enter();
254
255 on_failed();
256 }
257 }
258 }
259 }) as Box<dyn FnMut(web_sys::Event)>);
260 es.set_onerror(Some(on_error.as_ref().unchecked_ref()));
261 on_error.forget();
262
263 let on_message = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| {
264 set_data_from_string(e.data().as_string());
265 }) as Box<dyn FnMut(web_sys::MessageEvent)>);
266 es.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
267 on_message.forget();
268
269 for event_name in named_events.clone() {
270 let _ = use_event_listener(
271 es.clone(),
272 leptos::ev::Custom::<leptos::ev::Event>::new(event_name),
273 move |e| {
274 set_event.set(Some(e.clone()));
275 let data_string = js!(e["data"]).ok().and_then(|d| d.as_string());
276 set_data_from_string(data_string);
277 },
278 );
279 }
280 }
281 })));
282
283 let open;
284
285 #[cfg(not(feature = "ssr"))]
286 {
287 open = {
288 let close = close.clone();
289 let explicitly_closed = Arc::clone(&explicitly_closed);
290 let retried = Arc::clone(&retried);
291
292 sendwrap_fn!(move || {
293 close();
294 explicitly_closed.store(false, std::sync::atomic::Ordering::Relaxed);
295 retried.store(0, std::sync::atomic::Ordering::Relaxed);
296 if let Some(init) = init.get_value() {
297 init();
298 }
299 })
300 };
301 }
302
303 #[cfg(feature = "ssr")]
304 {
305 open = move || {};
306 }
307
308 if immediate {
309 open();
310 }
311
312 on_cleanup(close.clone());
313
314 UseEventSourceReturn {
315 event_source: event_source.into(),
316 event: event.into(),
317 data: data.into(),
318 ready_state: ready_state.into(),
319 error: error.into(),
320 open,
321 close,
322 }
323}
324
325#[derive(DefaultBuilder)]
327pub struct UseEventSourceOptions<T>
328where
329 T: 'static,
330{
331 reconnect_limit: ReconnectLimit,
334
335 reconnect_interval: u64,
337
338 on_failed: Arc<dyn Fn() + Send + Sync>,
340
341 immediate: bool,
345
346 #[builder(into)]
348 named_events: Vec<String>,
349
350 with_credentials: bool,
352
353 _marker: PhantomData<T>,
354}
355
356impl<T> Default for UseEventSourceOptions<T> {
357 fn default() -> Self {
358 Self {
359 reconnect_limit: ReconnectLimit::default(),
360 reconnect_interval: 3000,
361 on_failed: Arc::new(|| {}),
362 immediate: true,
363 named_events: vec![],
364 with_credentials: false,
365 _marker: PhantomData,
366 }
367 }
368}
369
370pub struct UseEventSourceReturn<T, Err, OpenFn, CloseFn>
372where
373 Err: Send + Sync + 'static,
374 T: Clone + Send + Sync + 'static,
375 OpenFn: Fn() + Clone + Send + Sync + 'static,
376 CloseFn: Fn() + Clone + Send + Sync + 'static,
377{
378 pub data: Signal<Option<T>>,
380
381 pub ready_state: Signal<ConnectionReadyState>,
383
384 pub event: Signal<Option<web_sys::Event>, LocalStorage>,
386
387 pub error: Signal<Option<UseEventSourceError<Err>>, LocalStorage>,
389
390 pub open: OpenFn,
393
394 pub close: CloseFn,
396
397 pub event_source: Signal<Option<web_sys::EventSource>, LocalStorage>,
399}
400
401#[derive(Error, Debug)]
402pub enum UseEventSourceError<Err> {
403 #[error("Error event: {0:?}")]
404 Event(web_sys::Event),
405
406 #[error("Error decoding value")]
407 Deserialize(Err),
408}