async_sse/
decoder.rs

1use crate::Lines;
2use futures_lite::prelude::*;
3use futures_lite::ready;
4use std::task::{Context, Poll};
5
6use std::pin::Pin;
7
8use crate::Event;
9
10/// Decode a new incoming SSE connection.
11pub fn decode<R>(reader: R) -> Decoder<R>
12where
13    R: AsyncBufRead + Unpin,
14{
15    Decoder {
16        lines: Lines::new(reader),
17        processed_bom: false,
18        buffer: vec![],
19        last_event_id: None,
20        event_type: None,
21        data: vec![],
22    }
23}
24
25/// An SSE protocol decoder.
26#[derive(Debug)]
27pub struct Decoder<R: AsyncBufRead + Unpin> {
28    /// The lines decoder.
29    lines: Lines<R>,
30    /// Have we processed the optional Byte Order Marker on the first line?
31    processed_bom: bool,
32    /// Was the last character of the previous line a \r?
33    /// Bytes that were fed to the decoder but do not yet form a message.
34    buffer: Vec<u8>,
35    /// The _last event ID_ buffer.
36    last_event_id: Option<String>,
37    /// The _event type_ buffer.
38    event_type: Option<String>,
39    /// The _data_ buffer.
40    data: Vec<u8>,
41}
42
43impl<R: AsyncBufRead + Unpin> Decoder<R> {
44    fn take_message(&mut self) -> Option<Event> {
45        if self.data.is_empty() {
46            // If the data buffer is an empty string, set the data buffer and
47            // the event type buffer to the empty string [and return.]
48            self.event_type.take();
49            None
50        } else {
51            // Removing tailing newlines
52            if self.data.ends_with(&[b'\n']) {
53                self.data.pop();
54            }
55            let name = self.event_type.take().unwrap_or("message".to_string());
56            let data = std::mem::replace(&mut self.data, vec![]);
57            // The _last event ID_ buffer persists between messages.
58            let id = self.last_event_id.clone();
59            Some(Event::new_msg(name, data, id))
60        }
61    }
62}
63
64impl<R: AsyncBufRead + Unpin> Stream for Decoder<R> {
65    type Item = http_types::Result<Event>;
66
67    // This function uses two loops: one to get lines from the reader.
68    // And one to parse each line delimited by `:`.
69    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        loop {
71            // Get the next line, if available.
72            let line = match ready!(Pin::new(&mut self.lines).poll_next(cx)) {
73                None => return Poll::Ready(None),
74                Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
75                Some(Ok(line)) => line,
76            };
77
78            // Get rid of the BOM at the start
79            let line = if !self.processed_bom && line.starts_with("\u{feff}") {
80                self.processed_bom = true;
81                &line[3..]
82            } else {
83                &line
84            };
85
86            log::trace!("> new line: {:?}", line);
87            let mut parts = line.splitn(2, ':');
88            loop {
89                match (parts.next(), parts.next()) {
90                    // If the field name is "retry":
91                    (Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
92                        log::trace!("> retry");
93                        // If the field value consists of only ASCII digits, then interpret the field value
94                        // as an integer in base ten, and set the event stream's reconnection time to that
95                        // integer. Otherwise, ignore the field.
96                        if let Ok(time) = value.parse::<u64>() {
97                            return Poll::Ready(Some(Ok(Event::new_retry(time))));
98                        }
99                    }
100                    // If the field name is "event":
101                    (Some("event"), Some(value)) => {
102                        log::trace!("> event");
103                        // Set the event type buffer to field value.
104                        self.event_type = Some(strip_leading_space(value).to_string());
105                    }
106                    // If the field name is "data":
107                    (Some("data"), value) => {
108                        log::trace!("> data: {:?}", &value);
109                        // Append the field value to the data buffer,
110                        if let Some(value) = value {
111                            self.data.extend(strip_leading_space_b(value.as_bytes()));
112                            // then append a single U+000A LINE FEED (LF) character to the data buffer.
113                        }
114                        self.data.push(b'\n');
115                    }
116                    // If the field name is "id":
117                    (Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
118                        log::trace!("> id");
119                        // If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value.
120                        // Otherwise, ignore the field.
121                        self.last_event_id = Some(strip_leading_space(id_str).to_string());
122                        // return Poll::Ready(Ok(self.take_message()).transpose());
123                    }
124                    // Comment
125                    (Some(""), Some(_)) => (log::trace!("> comment")),
126                    // End of frame
127                    (Some(""), None) => {
128                        log::trace!("> end of frame");
129                        match self.take_message() {
130                            Some(event) => {
131                                log::trace!("> end of frame [event]: {:?}", event);
132                                return Poll::Ready(Some(Ok(event)));
133                            }
134                            None => {
135                                log::trace!("> end of frame, break");
136                                break;
137                            }
138                        };
139                    }
140                    (_, _) => {
141                        break;
142                    }
143                };
144            }
145        }
146    }
147}
148
149/// Remove a leading space (code point 0x20) from a string slice.
150fn strip_leading_space(input: &str) -> &str {
151    if input.starts_with(' ') {
152        &input[1..]
153    } else {
154        input
155    }
156}
157
158fn strip_leading_space_b(input: &[u8]) -> &[u8] {
159    if input.starts_with(&[b' ']) {
160        &input[1..]
161    } else {
162        input
163    }
164}