1#[cfg(not(feature = "std"))]
2use alloc::string::{FromUtf8Error, String, ToString};
3
4#[cfg(feature = "std")]
5use std::string::FromUtf8Error;
6
7use crate::event::Event;
8use crate::parser::{is_bom, is_lf, line, RawEventLine};
9use crate::utf8_stream::{Utf8Stream, Utf8StreamError};
10use core::fmt;
11use core::pin::Pin;
12use core::time::Duration;
13use futures_core::stream::Stream;
14use futures_core::task::{Context, Poll};
15use nom::error::Error as NomError;
16use pin_project_lite::pin_project;
17
18#[derive(Default, Debug)]
19struct EventBuilder {
20 event: Event,
21 is_complete: bool,
22}
23
24impl EventBuilder {
25 fn add(&mut self, line: RawEventLine) {
46 match line {
47 RawEventLine::Field(field, val) => {
48 let val = val.unwrap_or("");
49 match field {
50 "event" => {
51 self.event.event = val.to_string();
52 }
53 "data" => {
54 self.event.data.push_str(val);
55 self.event.data.push('\u{000A}');
56 }
57 "id" => {
58 if !val.contains('\u{0000}') {
59 self.event.id = val.to_string()
60 }
61 }
62 "retry" => {
63 if let Ok(val) = val.parse::<u64>() {
64 self.event.retry = Some(Duration::from_millis(val))
65 }
66 }
67 _ => {}
68 }
69 }
70 RawEventLine::Comment(_) => {}
71 RawEventLine::Empty => self.is_complete = true,
72 }
73 }
74
75 fn dispatch(&mut self) -> Option<Event> {
96 let builder = core::mem::take(self);
97 let mut event = builder.event;
98 self.event.id = event.id.clone();
99
100 if event.data.is_empty() {
101 return None;
102 }
103
104 if is_lf(event.data.chars().next_back().unwrap()) {
105 event.data.pop();
106 }
107
108 if event.event.is_empty() {
109 event.event = "message".to_string();
110 }
111
112 Some(event)
113 }
114}
115
116#[derive(Debug, Clone, Copy)]
117pub enum EventStreamState {
118 NotStarted,
119 Started,
120 Terminated,
121}
122
123impl EventStreamState {
124 fn is_terminated(self) -> bool {
125 matches!(self, Self::Terminated)
126 }
127 fn is_started(self) -> bool {
128 matches!(self, Self::Started)
129 }
130}
131
132pin_project! {
133pub struct EventStream<S> {
135 #[pin]
136 stream: Utf8Stream<S>,
137 buffer: String,
138 builder: EventBuilder,
139 state: EventStreamState,
140 last_event_id: String,
141}
142}
143
144impl<S> EventStream<S> {
145 pub fn new(stream: S) -> Self {
147 Self {
148 stream: Utf8Stream::new(stream),
149 buffer: String::new(),
150 builder: EventBuilder::default(),
151 state: EventStreamState::NotStarted,
152 last_event_id: String::new(),
153 }
154 }
155
156 pub fn set_last_event_id(&mut self, id: impl Into<String>) {
159 self.last_event_id = id.into();
160 }
161
162 pub fn last_event_id(&self) -> &str {
164 &self.last_event_id
165 }
166}
167
168#[derive(Debug, PartialEq)]
170pub enum EventStreamError<E> {
171 Utf8(FromUtf8Error),
173 Parser(NomError<String>),
175 Transport(E),
177}
178
179impl<E> From<Utf8StreamError<E>> for EventStreamError<E> {
180 fn from(err: Utf8StreamError<E>) -> Self {
181 match err {
182 Utf8StreamError::Utf8(err) => Self::Utf8(err),
183 Utf8StreamError::Transport(err) => Self::Transport(err),
184 }
185 }
186}
187
188impl<E> From<NomError<&str>> for EventStreamError<E> {
189 fn from(err: NomError<&str>) -> Self {
190 EventStreamError::Parser(NomError::new(err.input.to_string(), err.code))
191 }
192}
193
194impl<E> fmt::Display for EventStreamError<E>
195where
196 E: fmt::Display,
197{
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 match self {
200 Self::Utf8(err) => f.write_fmt(format_args!("UTF8 error: {}", err)),
201 Self::Parser(err) => f.write_fmt(format_args!("Parse error: {}", err)),
202 Self::Transport(err) => f.write_fmt(format_args!("Transport error: {}", err)),
203 }
204 }
205}
206
207#[cfg(feature = "std")]
208impl<E> std::error::Error for EventStreamError<E> where E: fmt::Display + fmt::Debug + Send + Sync {}
209
210fn parse_event<E>(
211 buffer: &mut String,
212 builder: &mut EventBuilder,
213) -> Result<Option<Event>, EventStreamError<E>> {
214 if buffer.is_empty() {
215 return Ok(None);
216 }
217 loop {
218 match line(buffer.as_ref()) {
219 Ok((rem, next_line)) => {
220 builder.add(next_line);
221 let consumed = buffer.len() - rem.len();
222 let rem = buffer.split_off(consumed);
223 *buffer = rem;
224 if builder.is_complete {
225 if let Some(event) = builder.dispatch() {
226 return Ok(Some(event));
227 }
228 }
229 }
230 Err(nom::Err::Incomplete(_)) => return Ok(None),
231 Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => return Err(err.into()),
232 }
233 }
234}
235
236impl<S, B, E> Stream for EventStream<S>
237where
238 S: Stream<Item = Result<B, E>>,
239 B: AsRef<[u8]>,
240{
241 type Item = Result<Event, EventStreamError<E>>;
242
243 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
244 let mut this = self.project();
245
246 match parse_event(this.buffer, this.builder) {
247 Ok(Some(event)) => {
248 *this.last_event_id = event.id.clone();
249 return Poll::Ready(Some(Ok(event)));
250 }
251 Err(err) => return Poll::Ready(Some(Err(err))),
252 _ => {}
253 }
254
255 if this.state.is_terminated() {
256 return Poll::Ready(None);
257 }
258
259 loop {
260 match this.stream.as_mut().poll_next(cx) {
261 Poll::Ready(Some(Ok(string))) => {
262 if string.is_empty() {
263 continue;
264 }
265
266 let slice = if this.state.is_started() {
267 &string
268 } else {
269 *this.state = EventStreamState::Started;
270 if is_bom(string.chars().next().unwrap()) {
271 &string[1..]
272 } else {
273 &string
274 }
275 };
276 this.buffer.push_str(slice);
277
278 match parse_event(this.buffer, this.builder) {
279 Ok(Some(event)) => {
280 *this.last_event_id = event.id.clone();
281 return Poll::Ready(Some(Ok(event)));
282 }
283 Err(err) => return Poll::Ready(Some(Err(err))),
284 _ => {}
285 }
286 }
287 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
288 Poll::Ready(None) => {
289 *this.state = EventStreamState::Terminated;
290 return Poll::Ready(None);
291 }
292 Poll::Pending => return Poll::Pending,
293 }
294 }
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use futures::prelude::*;
302
303 #[tokio::test]
304 async fn valid_data_fields() {
305 assert_eq!(
306 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
307 "data: Hello, world!\n\n"
308 )]))
309 .try_collect::<Vec<_>>()
310 .await
311 .unwrap(),
312 vec![Event {
313 event: "message".to_string(),
314 data: "Hello, world!".to_string(),
315 ..Default::default()
316 }]
317 );
318 assert_eq!(
319 EventStream::new(futures::stream::iter(vec![
320 Ok::<_, ()>("data: Hello,"),
321 Ok::<_, ()>(" world!\n\n")
322 ]))
323 .try_collect::<Vec<_>>()
324 .await
325 .unwrap(),
326 vec![Event {
327 event: "message".to_string(),
328 data: "Hello, world!".to_string(),
329 ..Default::default()
330 }]
331 );
332 assert_eq!(
333 EventStream::new(futures::stream::iter(vec![
334 Ok::<_, ()>("data: Hello,"),
335 Ok::<_, ()>(""),
336 Ok::<_, ()>(" world!\n\n")
337 ]))
338 .try_collect::<Vec<_>>()
339 .await
340 .unwrap(),
341 vec![Event {
342 event: "message".to_string(),
343 data: "Hello, world!".to_string(),
344 ..Default::default()
345 }]
346 );
347 assert_eq!(
348 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
349 "data: Hello, world!\n"
350 )]))
351 .try_collect::<Vec<_>>()
352 .await
353 .unwrap(),
354 vec![]
355 );
356 assert_eq!(
357 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
358 "data: Hello,\ndata: world!\n\n"
359 )]))
360 .try_collect::<Vec<_>>()
361 .await
362 .unwrap(),
363 vec![Event {
364 event: "message".to_string(),
365 data: "Hello,\nworld!".to_string(),
366 ..Default::default()
367 }]
368 );
369 assert_eq!(
370 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
371 "data: Hello,\n\ndata: world!\n\n"
372 )]))
373 .try_collect::<Vec<_>>()
374 .await
375 .unwrap(),
376 vec![
377 Event {
378 event: "message".to_string(),
379 data: "Hello,".to_string(),
380 ..Default::default()
381 },
382 Event {
383 event: "message".to_string(),
384 data: "world!".to_string(),
385 ..Default::default()
386 }
387 ]
388 );
389 }
390
391 #[tokio::test]
392 async fn spec_examples() {
393 assert_eq!(
394 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
395 "data: This is the first message.
396
397data: This is the second message, it
398data: has two lines.
399
400data: This is the third message.
401
402"
403 )]))
404 .try_collect::<Vec<_>>()
405 .await
406 .unwrap(),
407 vec![
408 Event {
409 event: "message".to_string(),
410 data: "This is the first message.".to_string(),
411 ..Default::default()
412 },
413 Event {
414 event: "message".to_string(),
415 data: "This is the second message, it\nhas two lines.".to_string(),
416 ..Default::default()
417 },
418 Event {
419 event: "message".to_string(),
420 data: "This is the third message.".to_string(),
421 ..Default::default()
422 }
423 ]
424 );
425 assert_eq!(
426 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
427 "event: add
428data: 73857293
429
430event: remove
431data: 2153
432
433event: add
434data: 113411
435
436"
437 )]))
438 .try_collect::<Vec<_>>()
439 .await
440 .unwrap(),
441 vec![
442 Event {
443 event: "add".to_string(),
444 data: "73857293".to_string(),
445 ..Default::default()
446 },
447 Event {
448 event: "remove".to_string(),
449 data: "2153".to_string(),
450 ..Default::default()
451 },
452 Event {
453 event: "add".to_string(),
454 data: "113411".to_string(),
455 ..Default::default()
456 }
457 ]
458 );
459 assert_eq!(
460 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
461 "data: YHOO
462data: +2
463data: 10
464
465"
466 )]))
467 .try_collect::<Vec<_>>()
468 .await
469 .unwrap(),
470 vec![Event {
471 event: "message".to_string(),
472 data: "YHOO\n+2\n10".to_string(),
473 ..Default::default()
474 },]
475 );
476 assert_eq!(
477 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
478 ": test stream
479
480data: first event
481id: 1
482
483data:second event
484id
485
486data: third event
487
488"
489 )]))
490 .try_collect::<Vec<_>>()
491 .await
492 .unwrap(),
493 vec![
494 Event {
495 event: "message".to_string(),
496 id: "1".to_string(),
497 data: "first event".to_string(),
498 ..Default::default()
499 },
500 Event {
501 event: "message".to_string(),
502 data: "second event".to_string(),
503 ..Default::default()
504 },
505 Event {
506 event: "message".to_string(),
507 data: " third event".to_string(),
508 ..Default::default()
509 }
510 ]
511 );
512 assert_eq!(
513 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
514 "data
515
516data
517data
518
519data:
520"
521 )]))
522 .try_collect::<Vec<_>>()
523 .await
524 .unwrap(),
525 vec![
526 Event {
527 event: "message".to_string(),
528 data: "".to_string(),
529 ..Default::default()
530 },
531 Event {
532 event: "message".to_string(),
533 data: "\n".to_string(),
534 ..Default::default()
535 },
536 ]
537 );
538 assert_eq!(
539 EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
540 "data:test
541
542data: test
543
544"
545 )]))
546 .try_collect::<Vec<_>>()
547 .await
548 .unwrap(),
549 vec![
550 Event {
551 event: "message".to_string(),
552 data: "test".to_string(),
553 ..Default::default()
554 },
555 Event {
556 event: "message".to_string(),
557 data: "test".to_string(),
558 ..Default::default()
559 },
560 ]
561 );
562 }
563}