gloo_net/eventsource/
futures.rs1use crate::eventsource::{EventSourceError, State};
38use crate::js_to_js_error;
39use futures_channel::mpsc;
40use futures_core::{ready, Stream};
41use gloo_utils::errors::JsError;
42use pin_project::{pin_project, pinned_drop};
43use std::fmt;
44use std::fmt::Formatter;
45use std::pin::Pin;
46use std::task::{Context, Poll};
47use wasm_bindgen::prelude::*;
48use wasm_bindgen::JsCast;
49use web_sys::MessageEvent;
50
51#[derive(Clone)]
54pub struct EventSource {
55 es: web_sys::EventSource,
56}
57
58impl fmt::Debug for EventSource {
59 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
60 f.debug_struct("EventSource")
61 .field("url", &self.es.url())
62 .field("with_credentials", &self.es.with_credentials())
63 .field("ready_state", &self.state())
64 .finish_non_exhaustive()
65 }
66}
67
68#[pin_project(PinnedDrop)]
70pub struct EventSourceSubscription {
71 #[allow(clippy::type_complexity)]
72 error_callback: Closure<dyn FnMut(web_sys::Event)>,
73 es: web_sys::EventSource,
74 event_type: String,
75 message_callback: Closure<dyn FnMut(MessageEvent)>,
76 #[pin]
77 message_receiver: mpsc::UnboundedReceiver<StreamMessage>,
78}
79
80impl fmt::Debug for EventSourceSubscription {
81 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
82 f.debug_struct("EventSourceSubscription")
83 .field("event_source", &self.es)
84 .field("event_type", &self.event_type)
85 .finish_non_exhaustive()
86 }
87}
88
89impl EventSource {
90 pub fn new(url: &str) -> Result<Self, JsError> {
99 let es = web_sys::EventSource::new(url).map_err(js_to_js_error)?;
100
101 Ok(Self { es })
102 }
103
104 pub fn subscribe(
114 &mut self,
115 event_type: impl Into<String>,
116 ) -> Result<EventSourceSubscription, JsError> {
117 let event_type = event_type.into();
118 let (message_sender, message_receiver) = mpsc::unbounded();
119
120 let message_callback: Closure<dyn FnMut(MessageEvent)> = {
121 let event_type = event_type.clone();
122 let sender = message_sender.clone();
123 Closure::wrap(Box::new(move |e: MessageEvent| {
124 let event_type = event_type.clone();
125 let _ = sender.unbounded_send(StreamMessage::Message(event_type, e));
126 }) as Box<dyn FnMut(MessageEvent)>)
127 };
128
129 self.es
130 .add_event_listener_with_callback(
131 &event_type,
132 message_callback.as_ref().unchecked_ref(),
133 )
134 .map_err(js_to_js_error)?;
135
136 let error_callback: Closure<dyn FnMut(web_sys::Event)> = {
137 Closure::wrap(Box::new(move |e: web_sys::Event| {
138 let is_connecting = e
139 .current_target()
140 .map(|target| target.unchecked_into::<web_sys::EventSource>())
141 .map(|es| es.ready_state() == web_sys::EventSource::CONNECTING)
142 .unwrap_or(false);
143 if !is_connecting {
144 let _ = message_sender.unbounded_send(StreamMessage::ErrorEvent);
145 };
146 }) as Box<dyn FnMut(web_sys::Event)>)
147 };
148
149 self.es
150 .add_event_listener_with_callback("error", error_callback.as_ref().unchecked_ref())
151 .map_err(js_to_js_error)?;
152
153 Ok(EventSourceSubscription {
154 error_callback,
155 es: self.es.clone(),
156 event_type,
157 message_callback,
158 message_receiver,
159 })
160 }
161
162 pub fn close(mut self) {
167 self.close_and_notify();
168 }
169
170 fn close_and_notify(&mut self) {
171 self.es.close();
172 if let Ok(event) = web_sys::Event::new("error") {
175 let _ = self.es.dispatch_event(&event);
176 }
177 }
178
179 pub fn state(&self) -> State {
181 let ready_state = self.es.ready_state();
182 match ready_state {
183 0 => State::Connecting,
184 1 => State::Open,
185 2 => State::Closed,
186 _ => unreachable!(),
187 }
188 }
189}
190
191impl Drop for EventSource {
192 fn drop(&mut self) {
193 self.close_and_notify();
194 }
195}
196
197#[derive(Clone)]
198enum StreamMessage {
199 ErrorEvent,
200 Message(String, MessageEvent),
201}
202
203impl Stream for EventSourceSubscription {
204 type Item = Result<(String, MessageEvent), EventSourceError>;
205
206 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207 let msg = ready!(self.project().message_receiver.poll_next(cx));
208 match msg {
209 Some(StreamMessage::Message(event_type, msg)) => {
210 Poll::Ready(Some(Ok((event_type, msg))))
211 }
212 Some(StreamMessage::ErrorEvent) => {
213 Poll::Ready(Some(Err(EventSourceError::ConnectionError)))
214 }
215 None => Poll::Ready(None),
216 }
217 }
218}
219
220#[pinned_drop]
221impl PinnedDrop for EventSourceSubscription {
222 fn drop(self: Pin<&mut Self>) {
223 let _ = self.es.remove_event_listener_with_callback(
224 "error",
225 self.error_callback.as_ref().unchecked_ref(),
226 );
227
228 let _ = self.es.remove_event_listener_with_callback(
229 &self.event_type,
230 self.message_callback.as_ref().unchecked_ref(),
231 );
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use futures::StreamExt;
239 use wasm_bindgen_test::*;
240
241 wasm_bindgen_test_configure!(run_in_browser);
242
243 #[wasm_bindgen_test]
244 async fn eventsource_works() {
245 let sse_echo_server_url =
246 option_env!("SSE_ECHO_SERVER_URL").expect("Did you set SSE_ECHO_SERVER_URL?");
247
248 let mut es = EventSource::new(sse_echo_server_url).unwrap();
249 let mut servers = es.subscribe("server").unwrap();
250 let mut requests = es.subscribe("request").unwrap();
251
252 assert_eq!(servers.next().await.unwrap().unwrap().0, "server");
253 assert_eq!(requests.next().await.unwrap().unwrap().0, "request");
254 }
255
256 #[wasm_bindgen_test]
257 async fn eventsource_connect_failure_works() {
258 let mut es = EventSource::new("rubbish").unwrap();
259 let mut servers = es.subscribe("server").unwrap();
260
261 assert_eq!(
264 servers.next().await,
265 Some(Err(EventSourceError::ConnectionError))
266 );
267 }
268}