eventsource_stream/
event_stream.rs

1#[cfg(not(feature = "std"))]
2use alloc::string::{FromUtf8Error, String, ToString};
3
4#[cfg(feature = "std")]
5use std::string::FromUtf8Error;
6
7use crate::event::Event;
8use crate::parser::{is_bom, is_lf, line, RawEventLine};
9use crate::utf8_stream::{Utf8Stream, Utf8StreamError};
10use core::fmt;
11use core::pin::Pin;
12use core::time::Duration;
13use futures_core::stream::Stream;
14use futures_core::task::{Context, Poll};
15use nom::error::Error as NomError;
16use pin_project_lite::pin_project;
17
18#[derive(Default, Debug)]
19struct EventBuilder {
20    event: Event,
21    is_complete: bool,
22}
23
24impl EventBuilder {
25    /// From the HTML spec
26    ///
27    /// -> If the field name is "event"
28    ///    Set the event type buffer to field value.
29    ///
30    /// -> If the field name is "data"
31    ///    Append the field value to the data buffer, then append a single U+000A LINE FEED (LF)
32    ///    character to the data buffer.
33    ///
34    /// -> If the field name is "id"
35    ///    If the field value does not contain U+0000 NULL, then set the last event ID buffer
36    ///    to the field value. Otherwise, ignore the field.
37    ///
38    /// -> If the field name is "retry"
39    ///    If the field value consists of only ASCII digits, then interpret the field value as
40    ///    an integer in base ten, and set the event stream's reconnection time to that integer.
41    ///    Otherwise, ignore the field.
42    ///
43    /// -> Otherwise
44    ///    The field is ignored.
45    fn add(&mut self, line: RawEventLine) {
46        match line {
47            RawEventLine::Field(field, val) => {
48                let val = val.unwrap_or("");
49                match field {
50                    "event" => {
51                        self.event.event = val.to_string();
52                    }
53                    "data" => {
54                        self.event.data.push_str(val);
55                        self.event.data.push('\u{000A}');
56                    }
57                    "id" => {
58                        if !val.contains('\u{0000}') {
59                            self.event.id = val.to_string()
60                        }
61                    }
62                    "retry" => {
63                        if let Ok(val) = val.parse::<u64>() {
64                            self.event.retry = Some(Duration::from_millis(val))
65                        }
66                    }
67                    _ => {}
68                }
69            }
70            RawEventLine::Comment(_) => {}
71            RawEventLine::Empty => self.is_complete = true,
72        }
73    }
74
75    /// From the HTML spec
76    ///
77    /// 1. Set the last event ID string of the event source to the value of the last event ID
78    /// buffer. The buffer does not get reset, so the last event ID string of the event source
79    /// remains set to this value until the next time it is set by the server.
80    /// 2. If the data buffer is an empty string, set the data buffer and the event type buffer
81    /// to the empty string and return.
82    /// 3. If the data buffer's last character is a U+000A LINE FEED (LF) character, then remove
83    /// the last character from the data buffer.
84    /// 4. Let event be the result of creating an event using MessageEvent, in the relevant Realm
85    /// of the EventSource object.
86    /// 5. Initialize event's type attribute to message, its data attribute to data, its origin
87    /// attribute to the serialization of the origin of the event stream's final URL (i.e., the
88    /// URL after redirects), and its lastEventId attribute to the last event ID string of the
89    /// event source.
90    /// 6. If the event type buffer has a value other than the empty string, change the type of
91    /// the newly created event to equal the value of the event type buffer.
92    /// 7. Set the data buffer and the event type buffer to the empty string.
93    /// 8. Queue a task which, if the readyState attribute is set to a value other than CLOSED,
94    /// dispatches the newly created event at the EventSource object.
95    fn dispatch(&mut self) -> Option<Event> {
96        let builder = core::mem::take(self);
97        let mut event = builder.event;
98        self.event.id = event.id.clone();
99
100        if event.data.is_empty() {
101            return None;
102        }
103
104        if is_lf(event.data.chars().next_back().unwrap()) {
105            event.data.pop();
106        }
107
108        if event.event.is_empty() {
109            event.event = "message".to_string();
110        }
111
112        Some(event)
113    }
114}
115
116#[derive(Debug, Clone, Copy)]
117pub enum EventStreamState {
118    NotStarted,
119    Started,
120    Terminated,
121}
122
123impl EventStreamState {
124    fn is_terminated(self) -> bool {
125        matches!(self, Self::Terminated)
126    }
127    fn is_started(self) -> bool {
128        matches!(self, Self::Started)
129    }
130}
131
132pin_project! {
133/// A Stream of events
134pub struct EventStream<S> {
135    #[pin]
136    stream: Utf8Stream<S>,
137    buffer: String,
138    builder: EventBuilder,
139    state: EventStreamState,
140    last_event_id: String,
141}
142}
143
144impl<S> EventStream<S> {
145    /// Initialize the EventStream with a Stream
146    pub fn new(stream: S) -> Self {
147        Self {
148            stream: Utf8Stream::new(stream),
149            buffer: String::new(),
150            builder: EventBuilder::default(),
151            state: EventStreamState::NotStarted,
152            last_event_id: String::new(),
153        }
154    }
155
156    /// Set the last event ID of the stream. Useful for initializing the stream with a previous
157    /// last event ID
158    pub fn set_last_event_id(&mut self, id: impl Into<String>) {
159        self.last_event_id = id.into();
160    }
161
162    /// Get the last event ID of the stream
163    pub fn last_event_id(&self) -> &str {
164        &self.last_event_id
165    }
166}
167
168/// Error thrown while parsing an event line
169#[derive(Debug, PartialEq)]
170pub enum EventStreamError<E> {
171    /// Source stream is not valid UTF8
172    Utf8(FromUtf8Error),
173    /// Source stream is not a valid EventStream
174    Parser(NomError<String>),
175    /// Underlying source stream error
176    Transport(E),
177}
178
179impl<E> From<Utf8StreamError<E>> for EventStreamError<E> {
180    fn from(err: Utf8StreamError<E>) -> Self {
181        match err {
182            Utf8StreamError::Utf8(err) => Self::Utf8(err),
183            Utf8StreamError::Transport(err) => Self::Transport(err),
184        }
185    }
186}
187
188impl<E> From<NomError<&str>> for EventStreamError<E> {
189    fn from(err: NomError<&str>) -> Self {
190        EventStreamError::Parser(NomError::new(err.input.to_string(), err.code))
191    }
192}
193
194impl<E> fmt::Display for EventStreamError<E>
195where
196    E: fmt::Display,
197{
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        match self {
200            Self::Utf8(err) => f.write_fmt(format_args!("UTF8 error: {}", err)),
201            Self::Parser(err) => f.write_fmt(format_args!("Parse error: {}", err)),
202            Self::Transport(err) => f.write_fmt(format_args!("Transport error: {}", err)),
203        }
204    }
205}
206
207#[cfg(feature = "std")]
208impl<E> std::error::Error for EventStreamError<E> where E: fmt::Display + fmt::Debug + Send + Sync {}
209
210fn parse_event<E>(
211    buffer: &mut String,
212    builder: &mut EventBuilder,
213) -> Result<Option<Event>, EventStreamError<E>> {
214    if buffer.is_empty() {
215        return Ok(None);
216    }
217    loop {
218        match line(buffer.as_ref()) {
219            Ok((rem, next_line)) => {
220                builder.add(next_line);
221                let consumed = buffer.len() - rem.len();
222                let rem = buffer.split_off(consumed);
223                *buffer = rem;
224                if builder.is_complete {
225                    if let Some(event) = builder.dispatch() {
226                        return Ok(Some(event));
227                    }
228                }
229            }
230            Err(nom::Err::Incomplete(_)) => return Ok(None),
231            Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => return Err(err.into()),
232        }
233    }
234}
235
236impl<S, B, E> Stream for EventStream<S>
237where
238    S: Stream<Item = Result<B, E>>,
239    B: AsRef<[u8]>,
240{
241    type Item = Result<Event, EventStreamError<E>>;
242
243    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
244        let mut this = self.project();
245
246        match parse_event(this.buffer, this.builder) {
247            Ok(Some(event)) => {
248                *this.last_event_id = event.id.clone();
249                return Poll::Ready(Some(Ok(event)));
250            }
251            Err(err) => return Poll::Ready(Some(Err(err))),
252            _ => {}
253        }
254
255        if this.state.is_terminated() {
256            return Poll::Ready(None);
257        }
258
259        loop {
260            match this.stream.as_mut().poll_next(cx) {
261                Poll::Ready(Some(Ok(string))) => {
262                    if string.is_empty() {
263                        continue;
264                    }
265
266                    let slice = if this.state.is_started() {
267                        &string
268                    } else {
269                        *this.state = EventStreamState::Started;
270                        if is_bom(string.chars().next().unwrap()) {
271                            &string[1..]
272                        } else {
273                            &string
274                        }
275                    };
276                    this.buffer.push_str(slice);
277
278                    match parse_event(this.buffer, this.builder) {
279                        Ok(Some(event)) => {
280                            *this.last_event_id = event.id.clone();
281                            return Poll::Ready(Some(Ok(event)));
282                        }
283                        Err(err) => return Poll::Ready(Some(Err(err))),
284                        _ => {}
285                    }
286                }
287                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
288                Poll::Ready(None) => {
289                    *this.state = EventStreamState::Terminated;
290                    return Poll::Ready(None);
291                }
292                Poll::Pending => return Poll::Pending,
293            }
294        }
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use futures::prelude::*;
302
303    #[tokio::test]
304    async fn valid_data_fields() {
305        assert_eq!(
306            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
307                "data: Hello, world!\n\n"
308            )]))
309            .try_collect::<Vec<_>>()
310            .await
311            .unwrap(),
312            vec![Event {
313                event: "message".to_string(),
314                data: "Hello, world!".to_string(),
315                ..Default::default()
316            }]
317        );
318        assert_eq!(
319            EventStream::new(futures::stream::iter(vec![
320                Ok::<_, ()>("data: Hello,"),
321                Ok::<_, ()>(" world!\n\n")
322            ]))
323            .try_collect::<Vec<_>>()
324            .await
325            .unwrap(),
326            vec![Event {
327                event: "message".to_string(),
328                data: "Hello, world!".to_string(),
329                ..Default::default()
330            }]
331        );
332        assert_eq!(
333            EventStream::new(futures::stream::iter(vec![
334                Ok::<_, ()>("data: Hello,"),
335                Ok::<_, ()>(""),
336                Ok::<_, ()>(" world!\n\n")
337            ]))
338            .try_collect::<Vec<_>>()
339            .await
340            .unwrap(),
341            vec![Event {
342                event: "message".to_string(),
343                data: "Hello, world!".to_string(),
344                ..Default::default()
345            }]
346        );
347        assert_eq!(
348            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
349                "data: Hello, world!\n"
350            )]))
351            .try_collect::<Vec<_>>()
352            .await
353            .unwrap(),
354            vec![]
355        );
356        assert_eq!(
357            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
358                "data: Hello,\ndata: world!\n\n"
359            )]))
360            .try_collect::<Vec<_>>()
361            .await
362            .unwrap(),
363            vec![Event {
364                event: "message".to_string(),
365                data: "Hello,\nworld!".to_string(),
366                ..Default::default()
367            }]
368        );
369        assert_eq!(
370            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
371                "data: Hello,\n\ndata: world!\n\n"
372            )]))
373            .try_collect::<Vec<_>>()
374            .await
375            .unwrap(),
376            vec![
377                Event {
378                    event: "message".to_string(),
379                    data: "Hello,".to_string(),
380                    ..Default::default()
381                },
382                Event {
383                    event: "message".to_string(),
384                    data: "world!".to_string(),
385                    ..Default::default()
386                }
387            ]
388        );
389    }
390
391    #[tokio::test]
392    async fn spec_examples() {
393        assert_eq!(
394            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
395                "data: This is the first message.
396
397data: This is the second message, it
398data: has two lines.
399
400data: This is the third message.
401
402"
403            )]))
404            .try_collect::<Vec<_>>()
405            .await
406            .unwrap(),
407            vec![
408                Event {
409                    event: "message".to_string(),
410                    data: "This is the first message.".to_string(),
411                    ..Default::default()
412                },
413                Event {
414                    event: "message".to_string(),
415                    data: "This is the second message, it\nhas two lines.".to_string(),
416                    ..Default::default()
417                },
418                Event {
419                    event: "message".to_string(),
420                    data: "This is the third message.".to_string(),
421                    ..Default::default()
422                }
423            ]
424        );
425        assert_eq!(
426            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
427                "event: add
428data: 73857293
429
430event: remove
431data: 2153
432
433event: add
434data: 113411
435
436"
437            )]))
438            .try_collect::<Vec<_>>()
439            .await
440            .unwrap(),
441            vec![
442                Event {
443                    event: "add".to_string(),
444                    data: "73857293".to_string(),
445                    ..Default::default()
446                },
447                Event {
448                    event: "remove".to_string(),
449                    data: "2153".to_string(),
450                    ..Default::default()
451                },
452                Event {
453                    event: "add".to_string(),
454                    data: "113411".to_string(),
455                    ..Default::default()
456                }
457            ]
458        );
459        assert_eq!(
460            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
461                "data: YHOO
462data: +2
463data: 10
464
465"
466            )]))
467            .try_collect::<Vec<_>>()
468            .await
469            .unwrap(),
470            vec![Event {
471                event: "message".to_string(),
472                data: "YHOO\n+2\n10".to_string(),
473                ..Default::default()
474            },]
475        );
476        assert_eq!(
477            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
478                ": test stream
479
480data: first event
481id: 1
482
483data:second event
484id
485
486data:  third event
487
488"
489            )]))
490            .try_collect::<Vec<_>>()
491            .await
492            .unwrap(),
493            vec![
494                Event {
495                    event: "message".to_string(),
496                    id: "1".to_string(),
497                    data: "first event".to_string(),
498                    ..Default::default()
499                },
500                Event {
501                    event: "message".to_string(),
502                    data: "second event".to_string(),
503                    ..Default::default()
504                },
505                Event {
506                    event: "message".to_string(),
507                    data: " third event".to_string(),
508                    ..Default::default()
509                }
510            ]
511        );
512        assert_eq!(
513            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
514                "data
515
516data
517data
518
519data:
520"
521            )]))
522            .try_collect::<Vec<_>>()
523            .await
524            .unwrap(),
525            vec![
526                Event {
527                    event: "message".to_string(),
528                    data: "".to_string(),
529                    ..Default::default()
530                },
531                Event {
532                    event: "message".to_string(),
533                    data: "\n".to_string(),
534                    ..Default::default()
535                },
536            ]
537        );
538        assert_eq!(
539            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
540                "data:test
541
542data: test
543
544"
545            )]))
546            .try_collect::<Vec<_>>()
547            .await
548            .unwrap(),
549            vec![
550                Event {
551                    event: "message".to_string(),
552                    data: "test".to_string(),
553                    ..Default::default()
554                },
555                Event {
556                    event: "message".to_string(),
557                    data: "test".to_string(),
558                    ..Default::default()
559                },
560            ]
561        );
562    }
563}