gloo_net/eventsource/
futures.rs

1//! A wrapper around the `EventSource` API using the Futures API to be used with async rust.
2//!
3//! EventSource is similar to WebSocket with the major differences being:
4//!
5//! * they are a one-way stream of server generated events
6//! * their connection is managed entirely by the browser
7//! * their data is slightly more structured including an id, type and data
8//!
9//! EventSource is therefore suitable for simpler scenarios than WebSocket.
10//!
11//! See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) to learn more.
12//!
13//! # Example
14//!
15//! ```rust
16//! use gloo_net::eventsource::futures::EventSource;
17//! use wasm_bindgen_futures::spawn_local;
18//! use futures::{stream, StreamExt};
19//!
20//! # macro_rules! console_log {
21//! #    ($($expr:expr),*) => {{}};
22//! # }
23//! # fn no_run() {
24//! let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap();
25//! let stream_1 = es.subscribe("some-event-type").unwrap();
26//! let stream_2 = es.subscribe("another-event-type").unwrap();
27//!
28//! spawn_local(async move {
29//!     let mut all_streams = stream::select(stream_1, stream_2);
30//!     while let Some(Ok((event_type, msg))) = all_streams.next().await {
31//!         console_log!(format!("1. {}: {:?}", event_type, msg))
32//!     }
33//!     console_log!("EventSource Closed");
34//! })
35//! # }
36//! ```
37use crate::eventsource::{EventSourceError, State};
38use crate::js_to_js_error;
39use futures_channel::mpsc;
40use futures_core::{ready, Stream};
41use gloo_utils::errors::JsError;
42use pin_project::{pin_project, pinned_drop};
43use std::fmt;
44use std::fmt::Formatter;
45use std::pin::Pin;
46use std::task::{Context, Poll};
47use wasm_bindgen::prelude::*;
48use wasm_bindgen::JsCast;
49use web_sys::MessageEvent;
50
51/// Wrapper around browser's EventSource API. Dropping
52/// this will close the underlying event source.
53#[derive(Clone)]
54pub struct EventSource {
55    es: web_sys::EventSource,
56}
57
58impl fmt::Debug for EventSource {
59    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
60        f.debug_struct("EventSource")
61            .field("url", &self.es.url())
62            .field("with_credentials", &self.es.with_credentials())
63            .field("ready_state", &self.state())
64            .finish_non_exhaustive()
65    }
66}
67
68/// Wrapper around browser's EventSource API.
69#[pin_project(PinnedDrop)]
70pub struct EventSourceSubscription {
71    #[allow(clippy::type_complexity)]
72    error_callback: Closure<dyn FnMut(web_sys::Event)>,
73    es: web_sys::EventSource,
74    event_type: String,
75    message_callback: Closure<dyn FnMut(MessageEvent)>,
76    #[pin]
77    message_receiver: mpsc::UnboundedReceiver<StreamMessage>,
78}
79
80impl fmt::Debug for EventSourceSubscription {
81    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
82        f.debug_struct("EventSourceSubscription")
83            .field("event_source", &self.es)
84            .field("event_type", &self.event_type)
85            .finish_non_exhaustive()
86    }
87}
88
89impl EventSource {
90    /// Establish an EventSource.
91    ///
92    /// This function may error in the following cases:
93    /// - The connection url is invalid
94    ///
95    /// The error returned is [`JsError`]. See the
96    /// [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/EventSource#exceptions_thrown)
97    /// to learn more.
98    pub fn new(url: &str) -> Result<Self, JsError> {
99        let es = web_sys::EventSource::new(url).map_err(js_to_js_error)?;
100
101        Ok(Self { es })
102    }
103
104    /// Subscribes to listening for a specific type of event.
105    ///
106    /// All events for this type are streamed back given the subscription
107    /// returned.
108    ///
109    /// The event type of "message" is a special case, as it will capture
110    /// events without an event field as well as events that have the
111    /// specific type `event: message`. It will not trigger on any
112    /// other event type.
113    pub fn subscribe(
114        &mut self,
115        event_type: impl Into<String>,
116    ) -> Result<EventSourceSubscription, JsError> {
117        let event_type = event_type.into();
118        let (message_sender, message_receiver) = mpsc::unbounded();
119
120        let message_callback: Closure<dyn FnMut(MessageEvent)> = {
121            let event_type = event_type.clone();
122            let sender = message_sender.clone();
123            Closure::wrap(Box::new(move |e: MessageEvent| {
124                let event_type = event_type.clone();
125                let _ = sender.unbounded_send(StreamMessage::Message(event_type, e));
126            }) as Box<dyn FnMut(MessageEvent)>)
127        };
128
129        self.es
130            .add_event_listener_with_callback(
131                &event_type,
132                message_callback.as_ref().unchecked_ref(),
133            )
134            .map_err(js_to_js_error)?;
135
136        let error_callback: Closure<dyn FnMut(web_sys::Event)> = {
137            Closure::wrap(Box::new(move |e: web_sys::Event| {
138                let is_connecting = e
139                    .current_target()
140                    .map(|target| target.unchecked_into::<web_sys::EventSource>())
141                    .map(|es| es.ready_state() == web_sys::EventSource::CONNECTING)
142                    .unwrap_or(false);
143                if !is_connecting {
144                    let _ = message_sender.unbounded_send(StreamMessage::ErrorEvent);
145                };
146            }) as Box<dyn FnMut(web_sys::Event)>)
147        };
148
149        self.es
150            .add_event_listener_with_callback("error", error_callback.as_ref().unchecked_ref())
151            .map_err(js_to_js_error)?;
152
153        Ok(EventSourceSubscription {
154            error_callback,
155            es: self.es.clone(),
156            event_type,
157            message_callback,
158            message_receiver,
159        })
160    }
161
162    /// Closes the EventSource.
163    ///
164    /// See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/close#parameters)
165    /// to learn about this function
166    pub fn close(mut self) {
167        self.close_and_notify();
168    }
169
170    fn close_and_notify(&mut self) {
171        self.es.close();
172        // Fire an error event to cause all subscriber
173        // streams to close down.
174        if let Ok(event) = web_sys::Event::new("error") {
175            let _ = self.es.dispatch_event(&event);
176        }
177    }
178
179    /// The current state of the EventSource.
180    pub fn state(&self) -> State {
181        let ready_state = self.es.ready_state();
182        match ready_state {
183            0 => State::Connecting,
184            1 => State::Open,
185            2 => State::Closed,
186            _ => unreachable!(),
187        }
188    }
189}
190
191impl Drop for EventSource {
192    fn drop(&mut self) {
193        self.close_and_notify();
194    }
195}
196
197#[derive(Clone)]
198enum StreamMessage {
199    ErrorEvent,
200    Message(String, MessageEvent),
201}
202
203impl Stream for EventSourceSubscription {
204    type Item = Result<(String, MessageEvent), EventSourceError>;
205
206    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207        let msg = ready!(self.project().message_receiver.poll_next(cx));
208        match msg {
209            Some(StreamMessage::Message(event_type, msg)) => {
210                Poll::Ready(Some(Ok((event_type, msg))))
211            }
212            Some(StreamMessage::ErrorEvent) => {
213                Poll::Ready(Some(Err(EventSourceError::ConnectionError)))
214            }
215            None => Poll::Ready(None),
216        }
217    }
218}
219
220#[pinned_drop]
221impl PinnedDrop for EventSourceSubscription {
222    fn drop(self: Pin<&mut Self>) {
223        let _ = self.es.remove_event_listener_with_callback(
224            "error",
225            self.error_callback.as_ref().unchecked_ref(),
226        );
227
228        let _ = self.es.remove_event_listener_with_callback(
229            &self.event_type,
230            self.message_callback.as_ref().unchecked_ref(),
231        );
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use futures::StreamExt;
239    use wasm_bindgen_test::*;
240
241    wasm_bindgen_test_configure!(run_in_browser);
242
243    #[wasm_bindgen_test]
244    async fn eventsource_works() {
245        let sse_echo_server_url =
246            option_env!("SSE_ECHO_SERVER_URL").expect("Did you set SSE_ECHO_SERVER_URL?");
247
248        let mut es = EventSource::new(sse_echo_server_url).unwrap();
249        let mut servers = es.subscribe("server").unwrap();
250        let mut requests = es.subscribe("request").unwrap();
251
252        assert_eq!(servers.next().await.unwrap().unwrap().0, "server");
253        assert_eq!(requests.next().await.unwrap().unwrap().0, "request");
254    }
255
256    #[wasm_bindgen_test]
257    async fn eventsource_connect_failure_works() {
258        let mut es = EventSource::new("rubbish").unwrap();
259        let mut servers = es.subscribe("server").unwrap();
260
261        // we should expect an immediate failure
262
263        assert_eq!(
264            servers.next().await,
265            Some(Err(EventSourceError::ConnectionError))
266        );
267    }
268}