async_sse/decoder.rs
1use crate::Lines;
2use futures_lite::prelude::*;
3use futures_lite::ready;
4use std::task::{Context, Poll};
5
6use std::pin::Pin;
7
8use crate::Event;
9
10/// Decode a new incoming SSE connection.
11pub fn decode<R>(reader: R) -> Decoder<R>
12where
13 R: AsyncBufRead + Unpin,
14{
15 Decoder {
16 lines: Lines::new(reader),
17 processed_bom: false,
18 buffer: vec![],
19 last_event_id: None,
20 event_type: None,
21 data: vec![],
22 }
23}
24
25/// An SSE protocol decoder.
26#[derive(Debug)]
27pub struct Decoder<R: AsyncBufRead + Unpin> {
28 /// The lines decoder.
29 lines: Lines<R>,
30 /// Have we processed the optional Byte Order Marker on the first line?
31 processed_bom: bool,
32 /// Was the last character of the previous line a \r?
33 /// Bytes that were fed to the decoder but do not yet form a message.
34 buffer: Vec<u8>,
35 /// The _last event ID_ buffer.
36 last_event_id: Option<String>,
37 /// The _event type_ buffer.
38 event_type: Option<String>,
39 /// The _data_ buffer.
40 data: Vec<u8>,
41}
42
43impl<R: AsyncBufRead + Unpin> Decoder<R> {
44 fn take_message(&mut self) -> Option<Event> {
45 if self.data.is_empty() {
46 // If the data buffer is an empty string, set the data buffer and
47 // the event type buffer to the empty string [and return.]
48 self.event_type.take();
49 None
50 } else {
51 // Removing tailing newlines
52 if self.data.ends_with(&[b'\n']) {
53 self.data.pop();
54 }
55 let name = self.event_type.take().unwrap_or("message".to_string());
56 let data = std::mem::replace(&mut self.data, vec![]);
57 // The _last event ID_ buffer persists between messages.
58 let id = self.last_event_id.clone();
59 Some(Event::new_msg(name, data, id))
60 }
61 }
62}
63
64impl<R: AsyncBufRead + Unpin> Stream for Decoder<R> {
65 type Item = http_types::Result<Event>;
66
67 // This function uses two loops: one to get lines from the reader.
68 // And one to parse each line delimited by `:`.
69 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 loop {
71 // Get the next line, if available.
72 let line = match ready!(Pin::new(&mut self.lines).poll_next(cx)) {
73 None => return Poll::Ready(None),
74 Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
75 Some(Ok(line)) => line,
76 };
77
78 // Get rid of the BOM at the start
79 let line = if !self.processed_bom && line.starts_with("\u{feff}") {
80 self.processed_bom = true;
81 &line[3..]
82 } else {
83 &line
84 };
85
86 log::trace!("> new line: {:?}", line);
87 let mut parts = line.splitn(2, ':');
88 loop {
89 match (parts.next(), parts.next()) {
90 // If the field name is "retry":
91 (Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
92 log::trace!("> retry");
93 // If the field value consists of only ASCII digits, then interpret the field value
94 // as an integer in base ten, and set the event stream's reconnection time to that
95 // integer. Otherwise, ignore the field.
96 if let Ok(time) = value.parse::<u64>() {
97 return Poll::Ready(Some(Ok(Event::new_retry(time))));
98 }
99 }
100 // If the field name is "event":
101 (Some("event"), Some(value)) => {
102 log::trace!("> event");
103 // Set the event type buffer to field value.
104 self.event_type = Some(strip_leading_space(value).to_string());
105 }
106 // If the field name is "data":
107 (Some("data"), value) => {
108 log::trace!("> data: {:?}", &value);
109 // Append the field value to the data buffer,
110 if let Some(value) = value {
111 self.data.extend(strip_leading_space_b(value.as_bytes()));
112 // then append a single U+000A LINE FEED (LF) character to the data buffer.
113 }
114 self.data.push(b'\n');
115 }
116 // If the field name is "id":
117 (Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
118 log::trace!("> id");
119 // If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value.
120 // Otherwise, ignore the field.
121 self.last_event_id = Some(strip_leading_space(id_str).to_string());
122 // return Poll::Ready(Ok(self.take_message()).transpose());
123 }
124 // Comment
125 (Some(""), Some(_)) => (log::trace!("> comment")),
126 // End of frame
127 (Some(""), None) => {
128 log::trace!("> end of frame");
129 match self.take_message() {
130 Some(event) => {
131 log::trace!("> end of frame [event]: {:?}", event);
132 return Poll::Ready(Some(Ok(event)));
133 }
134 None => {
135 log::trace!("> end of frame, break");
136 break;
137 }
138 };
139 }
140 (_, _) => {
141 break;
142 }
143 };
144 }
145 }
146 }
147}
148
149/// Remove a leading space (code point 0x20) from a string slice.
150fn strip_leading_space(input: &str) -> &str {
151 if input.starts_with(' ') {
152 &input[1..]
153 } else {
154 input
155 }
156}
157
158fn strip_leading_space_b(input: &[u8]) -> &[u8] {
159 if input.starts_with(&[b' ']) {
160 &input[1..]
161 } else {
162 input
163 }
164}