async_sse/decoder.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
use crate::Lines;
use futures_lite::prelude::*;
use futures_lite::ready;
use std::task::{Context, Poll};
use std::pin::Pin;
use crate::Event;
/// Decode a new incoming SSE connection.
pub fn decode<R>(reader: R) -> Decoder<R>
where
R: AsyncBufRead + Unpin,
{
Decoder {
lines: Lines::new(reader),
processed_bom: false,
buffer: vec![],
last_event_id: None,
event_type: None,
data: vec![],
}
}
/// An SSE protocol decoder.
#[derive(Debug)]
pub struct Decoder<R: AsyncBufRead + Unpin> {
/// The lines decoder.
lines: Lines<R>,
/// Have we processed the optional Byte Order Marker on the first line?
processed_bom: bool,
/// Was the last character of the previous line a \r?
/// Bytes that were fed to the decoder but do not yet form a message.
buffer: Vec<u8>,
/// The _last event ID_ buffer.
last_event_id: Option<String>,
/// The _event type_ buffer.
event_type: Option<String>,
/// The _data_ buffer.
data: Vec<u8>,
}
impl<R: AsyncBufRead + Unpin> Decoder<R> {
fn take_message(&mut self) -> Option<Event> {
if self.data.is_empty() {
// If the data buffer is an empty string, set the data buffer and
// the event type buffer to the empty string [and return.]
self.event_type.take();
None
} else {
// Removing tailing newlines
if self.data.ends_with(&[b'\n']) {
self.data.pop();
}
let name = self.event_type.take().unwrap_or("message".to_string());
let data = std::mem::replace(&mut self.data, vec![]);
// The _last event ID_ buffer persists between messages.
let id = self.last_event_id.clone();
Some(Event::new_msg(name, data, id))
}
}
}
impl<R: AsyncBufRead + Unpin> Stream for Decoder<R> {
type Item = http_types::Result<Event>;
// This function uses two loops: one to get lines from the reader.
// And one to parse each line delimited by `:`.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// Get the next line, if available.
let line = match ready!(Pin::new(&mut self.lines).poll_next(cx)) {
None => return Poll::Ready(None),
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Some(Ok(line)) => line,
};
// Get rid of the BOM at the start
let line = if !self.processed_bom && line.starts_with("\u{feff}") {
self.processed_bom = true;
&line[3..]
} else {
&line
};
log::trace!("> new line: {:?}", line);
let mut parts = line.splitn(2, ':');
loop {
match (parts.next(), parts.next()) {
// If the field name is "retry":
(Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
log::trace!("> retry");
// If the field value consists of only ASCII digits, then interpret the field value
// as an integer in base ten, and set the event stream's reconnection time to that
// integer. Otherwise, ignore the field.
if let Ok(time) = value.parse::<u64>() {
return Poll::Ready(Some(Ok(Event::new_retry(time))));
}
}
// If the field name is "event":
(Some("event"), Some(value)) => {
log::trace!("> event");
// Set the event type buffer to field value.
self.event_type = Some(strip_leading_space(value).to_string());
}
// If the field name is "data":
(Some("data"), value) => {
log::trace!("> data: {:?}", &value);
// Append the field value to the data buffer,
if let Some(value) = value {
self.data.extend(strip_leading_space_b(value.as_bytes()));
// then append a single U+000A LINE FEED (LF) character to the data buffer.
}
self.data.push(b'\n');
}
// If the field name is "id":
(Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
log::trace!("> id");
// If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value.
// Otherwise, ignore the field.
self.last_event_id = Some(strip_leading_space(id_str).to_string());
// return Poll::Ready(Ok(self.take_message()).transpose());
}
// Comment
(Some(""), Some(_)) => (log::trace!("> comment")),
// End of frame
(Some(""), None) => {
log::trace!("> end of frame");
match self.take_message() {
Some(event) => {
log::trace!("> end of frame [event]: {:?}", event);
return Poll::Ready(Some(Ok(event)));
}
None => {
log::trace!("> end of frame, break");
break;
}
};
}
(_, _) => {
break;
}
};
}
}
}
}
/// Remove a leading space (code point 0x20) from a string slice.
fn strip_leading_space(input: &str) -> &str {
if input.starts_with(' ') {
&input[1..]
} else {
input
}
}
fn strip_leading_space_b(input: &[u8]) -> &[u8] {
if input.starts_with(&[b' ']) {
&input[1..]
} else {
input
}
}