leptos_use/
use_event_source.rs

1use crate::core::ConnectionReadyState;
2use crate::{js, sendwrap_fn, use_event_listener, ReconnectLimit};
3use codee::Decoder;
4use default_struct_builder::DefaultBuilder;
5use leptos::prelude::*;
6use std::marker::PhantomData;
7use std::sync::atomic::{AtomicBool, AtomicU32};
8use std::sync::Arc;
9use std::time::Duration;
10use thiserror::Error;
11
12/// Reactive [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
13///
14/// An [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) or
15/// [Server-Sent-Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events)
16/// instance opens a persistent connection to an HTTP server,
17/// which sends events in text/event-stream format.
18///
19/// ## Usage
20///
21/// Values are decoded via the given decoder. You can use any of the string codecs or a
22/// binary codec wrapped in `Base64`.
23///
24/// > Please check [the codec chapter](https://leptos-use.rs/codecs.html) to see what codecs are
25/// > available and what feature flags they require.
26///
27/// ```
28/// # use leptos::prelude::*;
29/// # use leptos_use::{use_event_source, UseEventSourceReturn};
30/// # use codee::string::JsonSerdeCodec;
31/// # use serde::{Deserialize, Serialize};
32/// #
33/// #[derive(Serialize, Deserialize, Clone, PartialEq)]
34/// pub struct EventSourceData {
35///     pub message: String,
36///     pub priority: u8,
37/// }
38///
39/// # #[component]
40/// # fn Demo() -> impl IntoView {
41/// let UseEventSourceReturn {
42///     ready_state, data, error, close, ..
43/// } = use_event_source::<EventSourceData, JsonSerdeCodec>("https://event-source-url");
44/// #
45/// # view! { }
46/// # }
47/// ```
48///
49/// ### Named Events
50///
51/// You can define named events when using `use_event_source_with_options`.
52///
53/// ```
54/// # use leptos::prelude::*;
55/// # use leptos_use::{use_event_source_with_options, UseEventSourceReturn, UseEventSourceOptions};
56/// # use codee::string::FromToStringCodec;
57/// #
58/// # #[component]
59/// # fn Demo() -> impl IntoView {
60/// let UseEventSourceReturn {
61///     ready_state, data, error, close, ..
62/// } = use_event_source_with_options::<String, FromToStringCodec>(
63///     "https://event-source-url",
64///     UseEventSourceOptions::default()
65///         .named_events(["notice".to_string(), "update".to_string()])
66/// );
67/// #
68/// # view! { }
69/// # }
70/// ```
71///
72/// ### Immediate
73///
74/// Auto-connect (enabled by default).
75///
76/// This will call `open()` automatically for you, and you don't need to call it by yourself.
77///
78/// ### Auto-Reconnection
79///
80/// Reconnect on errors automatically (enabled by default).
81///
82/// You can control the number of reconnection attempts by setting `reconnect_limit` and the
83/// interval between them by setting `reconnect_interval`.
84///
85/// ```
86/// # use leptos::prelude::*;
87/// # use leptos_use::{use_event_source_with_options, UseEventSourceReturn, UseEventSourceOptions, ReconnectLimit};
88/// # use codee::string::FromToStringCodec;
89/// #
90/// # #[component]
91/// # fn Demo() -> impl IntoView {
92/// let UseEventSourceReturn {
93///     ready_state, data, error, close, ..
94/// } = use_event_source_with_options::<bool, FromToStringCodec>(
95///     "https://event-source-url",
96///     UseEventSourceOptions::default()
97///         .reconnect_limit(ReconnectLimit::Limited(5))         // at most 5 attempts
98///         .reconnect_interval(2000)   // wait for 2 seconds between attempts
99/// );
100/// #
101/// # view! { }
102/// # }
103/// ```
104///
105///
106/// ## SendWrapped Return
107///
108/// The returned closures `open` and `close` are sendwrapped functions. They can
109/// only be called from the same thread that called `use_event_source`.
110///
111/// To disable auto-reconnection, set `reconnect_limit` to `0`.
112///
113/// ## Server-Side Rendering
114///
115/// On the server-side, `use_event_source` will always return `ready_state` as `ConnectionReadyState::Closed`,
116/// `data`, `event` and `error` will always be `None`, and `open` and `close` will do nothing.
117pub fn use_event_source<T, C>(
118    url: &str,
119) -> UseEventSourceReturn<
120    T,
121    C::Error,
122    impl Fn() + Clone + Send + Sync + 'static,
123    impl Fn() + Clone + Send + Sync + 'static,
124>
125where
126    T: Clone + PartialEq + Send + Sync + 'static,
127    C: Decoder<T, Encoded = str>,
128    C::Error: Send + Sync,
129{
130    use_event_source_with_options::<T, C>(url, UseEventSourceOptions::<T>::default())
131}
132
133/// Version of [`use_event_source`] that takes a `UseEventSourceOptions`. See [`use_event_source`] for how to use.
134pub fn use_event_source_with_options<T, C>(
135    url: &str,
136    options: UseEventSourceOptions<T>,
137) -> UseEventSourceReturn<
138    T,
139    C::Error,
140    impl Fn() + Clone + Send + Sync + 'static,
141    impl Fn() + Clone + Send + Sync + 'static,
142>
143where
144    T: Clone + PartialEq + Send + Sync + 'static,
145    C: Decoder<T, Encoded = str>,
146    C::Error: Send + Sync,
147{
148    let UseEventSourceOptions {
149        reconnect_limit,
150        reconnect_interval,
151        on_failed,
152        immediate,
153        named_events,
154        with_credentials,
155        _marker,
156    } = options;
157
158    let url = url.to_owned();
159
160    let (event, set_event) = signal_local(None::<web_sys::Event>);
161    let (data, set_data) = signal(None::<T>);
162    let (ready_state, set_ready_state) = signal(ConnectionReadyState::Closed);
163    let (event_source, set_event_source) = signal_local(None::<web_sys::EventSource>);
164    let (error, set_error) = signal_local(None::<UseEventSourceError<C::Error>>);
165
166    let explicitly_closed = Arc::new(AtomicBool::new(false));
167    let retried = Arc::new(AtomicU32::new(0));
168
169    let set_data_from_string = move |data_string: Option<String>| {
170        if let Some(data_string) = data_string {
171            match C::decode(&data_string) {
172                Ok(data) => set_data.set(Some(data)),
173                Err(err) => set_error.set(Some(UseEventSourceError::Deserialize(err))),
174            }
175        }
176    };
177
178    let close = {
179        let explicitly_closed = Arc::clone(&explicitly_closed);
180
181        sendwrap_fn!(move || {
182            if let Some(event_source) = event_source.get_untracked() {
183                event_source.close();
184                set_event_source.set(None);
185                set_ready_state.set(ConnectionReadyState::Closed);
186                explicitly_closed.store(true, std::sync::atomic::Ordering::Relaxed);
187            }
188        })
189    };
190
191    let init = StoredValue::new(None::<Arc<dyn Fn() + Send + Sync>>);
192
193    init.set_value(Some(Arc::new({
194        let explicitly_closed = Arc::clone(&explicitly_closed);
195        let retried = Arc::clone(&retried);
196
197        move || {
198            use wasm_bindgen::prelude::*;
199
200            if explicitly_closed.load(std::sync::atomic::Ordering::Relaxed) {
201                return;
202            }
203
204            let event_src_opts = web_sys::EventSourceInit::new();
205            event_src_opts.set_with_credentials(with_credentials);
206
207            let es = web_sys::EventSource::new_with_event_source_init_dict(&url, &event_src_opts)
208                .unwrap_throw();
209
210            set_ready_state.set(ConnectionReadyState::Connecting);
211
212            set_event_source.set(Some(es.clone()));
213
214            let on_open = Closure::wrap(Box::new(move |_: web_sys::Event| {
215                set_ready_state.set(ConnectionReadyState::Open);
216                set_error.set(None);
217            }) as Box<dyn FnMut(web_sys::Event)>);
218            es.set_onopen(Some(on_open.as_ref().unchecked_ref()));
219            on_open.forget();
220
221            let on_error = Closure::wrap(Box::new({
222                let explicitly_closed = Arc::clone(&explicitly_closed);
223                let retried = Arc::clone(&retried);
224                let on_failed = Arc::clone(&on_failed);
225                let es = es.clone();
226
227                move |e: web_sys::Event| {
228                    set_ready_state.set(ConnectionReadyState::Closed);
229                    set_error.set(Some(UseEventSourceError::Event(e)));
230
231                    // only reconnect if EventSource isn't reconnecting by itself
232                    // this is the case when the connection is closed (readyState is 2)
233                    if es.ready_state() == 2
234                        && !explicitly_closed.load(std::sync::atomic::Ordering::Relaxed)
235                        && matches!(reconnect_limit, ReconnectLimit::Limited(_))
236                    {
237                        es.close();
238
239                        let retried_value =
240                            retried.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
241
242                        if reconnect_limit.is_exceeded_by(retried_value as u64) {
243                            set_timeout(
244                                move || {
245                                    if let Some(init) = init.get_value() {
246                                        init();
247                                    }
248                                },
249                                Duration::from_millis(reconnect_interval),
250                            );
251                        } else {
252                            #[cfg(debug_assertions)]
253                            let _z = leptos::reactive::diagnostics::SpecialNonReactiveZone::enter();
254
255                            on_failed();
256                        }
257                    }
258                }
259            }) as Box<dyn FnMut(web_sys::Event)>);
260            es.set_onerror(Some(on_error.as_ref().unchecked_ref()));
261            on_error.forget();
262
263            let on_message = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| {
264                set_data_from_string(e.data().as_string());
265            }) as Box<dyn FnMut(web_sys::MessageEvent)>);
266            es.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
267            on_message.forget();
268
269            for event_name in named_events.clone() {
270                let _ = use_event_listener(
271                    es.clone(),
272                    leptos::ev::Custom::<leptos::ev::Event>::new(event_name),
273                    move |e| {
274                        set_event.set(Some(e.clone()));
275                        let data_string = js!(e["data"]).ok().and_then(|d| d.as_string());
276                        set_data_from_string(data_string);
277                    },
278                );
279            }
280        }
281    })));
282
283    let open;
284
285    #[cfg(not(feature = "ssr"))]
286    {
287        open = {
288            let close = close.clone();
289            let explicitly_closed = Arc::clone(&explicitly_closed);
290            let retried = Arc::clone(&retried);
291
292            sendwrap_fn!(move || {
293                close();
294                explicitly_closed.store(false, std::sync::atomic::Ordering::Relaxed);
295                retried.store(0, std::sync::atomic::Ordering::Relaxed);
296                if let Some(init) = init.get_value() {
297                    init();
298                }
299            })
300        };
301    }
302
303    #[cfg(feature = "ssr")]
304    {
305        open = move || {};
306    }
307
308    if immediate {
309        open();
310    }
311
312    on_cleanup(close.clone());
313
314    UseEventSourceReturn {
315        event_source: event_source.into(),
316        event: event.into(),
317        data: data.into(),
318        ready_state: ready_state.into(),
319        error: error.into(),
320        open,
321        close,
322    }
323}
324
325/// Options for [`use_event_source_with_options`].
326#[derive(DefaultBuilder)]
327pub struct UseEventSourceOptions<T>
328where
329    T: 'static,
330{
331    /// Retry times. Defaults to `ReconnectLimit::Limited(3)`. Use `ReconnectLimit::Infinite` for
332    /// infinite retries.
333    reconnect_limit: ReconnectLimit,
334
335    /// Retry interval in ms. Defaults to 3000.
336    reconnect_interval: u64,
337
338    /// On maximum retry times reached.
339    on_failed: Arc<dyn Fn() + Send + Sync>,
340
341    /// If `true` the `EventSource` connection will immediately be opened when calling this function.
342    /// If `false` you have to manually call the `open` function.
343    /// Defaults to `true`.
344    immediate: bool,
345
346    /// List of named events to listen for on the `EventSource`.
347    #[builder(into)]
348    named_events: Vec<String>,
349
350    /// If CORS should be set to `include` credentials. Defaults to `false`.
351    with_credentials: bool,
352
353    _marker: PhantomData<T>,
354}
355
356impl<T> Default for UseEventSourceOptions<T> {
357    fn default() -> Self {
358        Self {
359            reconnect_limit: ReconnectLimit::default(),
360            reconnect_interval: 3000,
361            on_failed: Arc::new(|| {}),
362            immediate: true,
363            named_events: vec![],
364            with_credentials: false,
365            _marker: PhantomData,
366        }
367    }
368}
369
370/// Return type of [`use_event_source`].
371pub struct UseEventSourceReturn<T, Err, OpenFn, CloseFn>
372where
373    Err: Send + Sync + 'static,
374    T: Clone + Send + Sync + 'static,
375    OpenFn: Fn() + Clone + Send + Sync + 'static,
376    CloseFn: Fn() + Clone + Send + Sync + 'static,
377{
378    /// Latest data received via the `EventSource`
379    pub data: Signal<Option<T>>,
380
381    /// The current state of the connection,
382    pub ready_state: Signal<ConnectionReadyState>,
383
384    /// The latest named event
385    pub event: Signal<Option<web_sys::Event>, LocalStorage>,
386
387    /// The current error
388    pub error: Signal<Option<UseEventSourceError<Err>>, LocalStorage>,
389
390    /// (Re-)Opens the `EventSource` connection
391    /// If the current one is active, will close it before opening a new one.
392    pub open: OpenFn,
393
394    /// Closes the `EventSource` connection
395    pub close: CloseFn,
396
397    /// The `EventSource` instance
398    pub event_source: Signal<Option<web_sys::EventSource>, LocalStorage>,
399}
400
401#[derive(Error, Debug)]
402pub enum UseEventSourceError<Err> {
403    #[error("Error event: {0:?}")]
404    Event(web_sys::Event),
405
406    #[error("Error decoding value")]
407    Deserialize(Err),
408}