use bytes::BytesMut;
use futures_codec::{Decoder, Encoder, FramedRead, FramedWrite};
use futures_io::{AsyncRead, AsyncWrite};
use memchr::memchr2;
use std::fmt::Write as _;
use std::{fmt, str::FromStr};
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
pub enum Event {
Message {
id: Option<String>,
event: String,
data: String,
},
Retry {
retry: u64,
},
}
impl Event {
pub fn message<'a>(event: &str, data: &str, id: impl Into<Option<&'a str>>) -> Self {
Event::Message {
id: id.into().map(String::from),
event: event.to_string(),
data: data.to_string(),
}
}
pub fn retry(time: u64) -> Self {
Event::Retry { retry: time }
}
}
#[derive(Debug)]
pub enum Error {
IoError(std::io::Error),
Utf8Error(std::str::Utf8Error),
FmtError(std::fmt::Error),
IncompleteFrame,
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::IoError(inner) => inner.fmt(f),
Error::Utf8Error(inner) => inner.fmt(f),
Error::FmtError(inner) => inner.fmt(f),
Error::IncompleteFrame => write!(f, "incomplete frame"),
}
}
}
impl std::error::Error for Error {}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}
impl From<std::fmt::Error> for Error {
fn from(err: std::fmt::Error) -> Self {
Self::FmtError(err)
}
}
impl From<std::str::Utf8Error> for Error {
fn from(err: std::str::Utf8Error) -> Self {
Self::Utf8Error(err)
}
}
fn strip_leading_space(input: &str) -> &str {
if input.starts_with(' ') {
&input[1..]
} else {
input
}
}
impl FromStr for Event {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut codec = SSECodec::default();
for line in s.lines() {
if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
return Ok(message);
}
}
Err(Error::IncompleteFrame)
}
}
impl fmt::Display for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Event::Message { id, event, data } => {
if let Some(id) = id {
if id.is_empty() {
writeln!(f, "id")?;
} else {
writeln!(f, "id: {}", &id)?;
}
}
if event != "message" {
writeln!(f, "event: {}", &event)?;
}
for line in data.lines() {
writeln!(f, "data: {}", line)?;
}
Ok(())
}
Event::Retry { retry } => writeln!(f, "retry: {}", retry),
}
}
}
#[derive(Debug, Default, Clone)]
pub struct SSECodec {
processed_bom: bool,
last_was_cr: bool,
buffer: BytesMut,
last_event_id: Option<String>,
event_type: Option<String>,
data: String,
}
impl SSECodec {
fn take_message(&mut self) -> Option<Event> {
fn default_event_name() -> String {
"message".to_string()
}
if self.data.is_empty() {
self.event_type.take();
None
} else {
if self.data.ends_with('\n') {
self.data.pop();
}
Some(Event::Message {
id: self.last_event_id.clone(),
event: self.event_type.take().unwrap_or_else(default_event_name),
data: std::mem::replace(&mut self.data, String::new()),
})
}
}
fn parse_line(&mut self, line: &str) -> Option<Event> {
let mut parts = line.splitn(2, ':');
match (parts.next(), parts.next()) {
(Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
if let Ok(time) = value.parse::<u64>() {
return Some(Event::Retry { retry: time });
}
}
(Some("event"), Some(value)) => {
self.event_type = Some(strip_leading_space(value).to_string());
}
(Some("data"), value) => {
if let Some(value) = value {
self.data += strip_leading_space(value);
}
self.data.push('\n');
}
(Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
self.last_event_id = Some(strip_leading_space(id_str).to_string());
}
(Some(""), Some(_)) => (),
(Some(""), None) => {
return self.take_message();
}
_ => (),
}
None
}
}
impl Decoder for SSECodec {
type Item = Event;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.buffer.unsplit(src.split_to(src.len()));
while let Some(pos) = memchr2(b'\r', b'\n', &self.buffer) {
let line = self.buffer.split_to(pos + 1);
if pos == 0 && line == "\n" && self.last_was_cr {
self.last_was_cr = false;
continue;
}
self.last_was_cr = line.last() == Some(&b'\r');
let line = std::str::from_utf8(&line[..pos])?;
let line = if line.starts_with("\u{feff}") && !self.processed_bom {
self.processed_bom = true;
&line[3..]
} else {
line
};
if let Some(event) = self.parse_line(line) {
return Ok(Some(event));
}
}
Ok(None)
}
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
unreachable!(
"SSECodec::decode() should have consumed all data, but {} bytes are in the buffer",
src.len()
)
}
}
impl Encoder for SSECodec {
type Item = Event;
type Error = Error;
fn encode(&mut self, item: Self::Item, dest: &mut BytesMut) -> Result<(), Self::Error> {
writeln!(dest, "{}", item).map_err(Into::into)
}
}
pub type DecodeStream<R> = FramedRead<R, SSECodec>;
pub type EncodeStream<W> = FramedWrite<W, SSECodec>;
pub fn decode_stream<R: AsyncRead>(input: R) -> DecodeStream<R> {
FramedRead::new(input, SSECodec::default())
}
pub fn encode_stream<W: AsyncWrite>(output: W) -> EncodeStream<W> {
FramedWrite::new(output, SSECodec::default())
}
#[cfg(test)]
mod encode_tests {
use super::*;
use futures::SinkExt;
#[async_std::test]
async fn simple_event() {
let mut output = vec![];
let mut stream = encode_stream(&mut output);
stream
.send(Event::Message {
id: None,
event: "add".to_string(),
data: "test\ntest2".to_string(),
})
.await
.unwrap();
assert_eq!(output, b"event: add\ndata: test\ndata: test2\n\n".to_vec());
}
#[async_std::test]
async fn with_id() {
let mut output = vec![];
let mut stream = encode_stream(&mut output);
stream
.send(Event::Message {
id: Some("whatever".to_string()),
event: "add".to_string(),
data: "test".to_string(),
})
.await
.unwrap();
assert_eq!(output, b"id: whatever\nevent: add\ndata: test\n\n".to_vec());
}
#[async_std::test]
async fn default_event() {
let mut output = vec![];
let mut stream = encode_stream(&mut output);
stream
.send(Event::Message {
id: None,
event: "message".to_string(),
data: "test".to_string(),
})
.await
.unwrap();
assert_eq!(output, b"data: test\n\n".to_vec());
}
#[async_std::test]
async fn multiple_events() {
let mut output = vec![];
let mut stream = encode_stream(&mut output);
stream
.send(Event::Message {
id: None,
event: "add".to_string(),
data: "test\ntest2".to_string(),
})
.await
.unwrap();
stream
.send(Event::Message {
id: Some("whatever".to_string()),
event: "add".to_string(),
data: "test".to_string(),
})
.await
.unwrap();
stream
.send(Event::Message {
id: None,
event: "message".to_string(),
data: "test".to_string(),
})
.await
.unwrap();
let mut expected = vec![];
expected.extend(b"event: add\ndata: test\ndata: test2\n\n".iter());
expected.extend(b"id: whatever\nevent: add\ndata: test\n\n".iter());
expected.extend(b"data: test\n\n".iter());
assert_eq!(output, expected);
}
}
#[cfg(test)]
mod decode_tests {
use super::*;
use futures::stream::{self, StreamExt, TryStreamExt};
#[test]
fn simple_event() {
let mut codec = SSECodec::default();
let mut event = None;
let s = "event: add\ndata: test\ndata: test2\n\n";
for line in s.lines() {
if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
event = Some(message);
break;
}
}
assert_eq!(
event,
Some(Event::Message {
id: None,
event: "add".to_string(),
data: "test\ntest2".to_string(),
})
);
}
#[test]
fn decode_stream_when_fed_by_line() {
let input: Vec<&str> = vec![":ok", "", "event:message", "id:id1", "data:data1", ""];
let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
let messages = decode_stream(body_stream.into_async_read());
let mut result = None;
async_std::task::block_on(async {
result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
});
let results = result.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(
results.get(0).unwrap(),
&Event::message("message", "data1", "id1")
);
}
#[test]
fn maintain_id_state() {
let input: Vec<&str> = vec!["id:1", "data:messageone", "", "data:messagetwo", ""];
let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
let messages = decode_stream(body_stream.into_async_read());
let mut result = None;
async_std::task::block_on(async {
result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
});
let mut results = result.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(
results.remove(0),
Event::message("message", "messageone", "1")
);
assert_eq!(
results.remove(0),
Event::message("message", "messagetwo", "1")
);
}
#[test]
fn id_is_part_of_message() {
let input: Vec<&str> = vec![
"data:messageone",
"id:1",
"data:moremessageone",
"",
"data:messagetwo",
"",
];
let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
let messages = decode_stream(body_stream.into_async_read());
let mut result = None;
async_std::task::block_on(async {
result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
});
let mut results = result.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(
results.remove(0),
Event::message("message", "messageone\nmoremessageone", "1")
);
assert_eq!(
results.remove(0),
Event::message("message", "messagetwo", "1")
);
}
}
#[cfg(test)]
mod wpt {
use super::*;
use futures::stream::StreamExt;
struct DecodeIter<'a> {
inner: FramedRead<&'a [u8], SSECodec>,
}
impl Iterator for DecodeIter<'_> {
type Item = Result<Event, Error>;
fn next(&mut self) -> Option<Self::Item> {
let mut result = None;
async_std::task::block_on(async {
result = self.inner.next().await;
});
result
}
}
fn decode(input: &[u8]) -> DecodeIter<'_> {
DecodeIter {
inner: decode_stream(input),
}
}
#[test]
fn data() {
let input = concat!(
"data:msg\n",
"data:msg\n",
"\n",
":\n",
"falsefield:msg\n",
"\n",
"falsefield:msg\n",
"Data:data\n",
"\n",
"data\n",
"\n",
"data:end\n",
"\n",
);
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "msg\nmsg".into()
})
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "".into()
})
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "end".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn bom() {
let mut input = vec![];
input.extend(b"\xEF\xBB\xBF");
input.extend(b"data:1\n");
input.extend(b"\n");
input.extend(b"\xEF\xBB\xBF");
input.extend(b"data:2\n");
input.extend(b"\n");
input.extend(b"data:3\n");
input.extend(b"\n");
let mut messages = decode(&input);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "1".into()
})
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "3".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn bom2() {
let mut input = vec![];
input.extend(b"\xEF\xBB\xBF");
input.extend(b"\xEF\xBB\xBF");
input.extend(b"data:1\n");
input.extend(b"\n");
input.extend(b"data:2\n");
input.extend(b"\n");
input.extend(b"data:3\n");
input.extend(b"\n");
let mut messages = decode(&input);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "2".into()
})
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "3".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn comments() {
let longstring = "x".repeat(2049);
let mut input = concat!("data:1\r", ":\0\n", ":\r\n", "data:2\n", ":").to_string();
input.push_str(&longstring);
input.push_str("\r");
input.push_str("data:3\n");
input.push_str(":data:fail\r");
input.push_str(":");
input.push_str(&longstring);
input.push_str("\n");
input.push_str("data:4\n\n");
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "1\n2\n3\n4".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn data_before_final_empty_line() {
let input = "retry:1000\ndata:test1\n\nid:test\ndata:test2";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Retry { retry: 1000 })
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "test1".into()
})
);
assert!(dbg!(messages.next()).is_none());
}
#[test]
fn field_data() {
let input = "data:\n\ndata\ndata\n\ndata:test\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "".into()
})
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "\n".into()
})
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "test".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn field_event_empty() {
let input = "event: \ndata:data\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "".into(),
data: "data".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn field_event() {
let input = "event:test\ndata:x\n\ndata:x\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "test".into(),
data: "x".into()
})
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "x".into()
})
);
assert!(messages.next().is_none());
}
#[test]
#[ignore]
fn field_id() {
unimplemented!()
}
#[test]
#[ignore]
fn field_id_2() {
unimplemented!()
}
#[test]
fn field_parsing() {
let input = "data:\0\ndata: 2\rData:1\ndata\0:2\ndata:1\r\0data:4\nda-ta:3\rdata_5\ndata:3\rdata:\r\n data:32\ndata:4\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "\0\n 2\n1\n3\n\n4".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn field_retry_bogus() {
let input = "retry:3000\nretry:1000x\ndata:x\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Retry { retry: 3000 })
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "x".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn field_retry_empty() {
let input = "retry\ndata:test\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "test".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn field_retry() {
let input = "retry:03000\ndata:x\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Retry { retry: 3000 })
);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "x".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn field_unknown() {
let input =
"data:test\n data\ndata\nfoobar:xxx\njustsometext\n:thisisacommentyay\ndata:test\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "test\n\ntest".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn leading_space() {
let input = "data:\ttest\rdata: \ndata:test\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "\ttest\n\ntest".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn newlines() {
let input = "data:test\r\ndata\ndata:test\r\n\r";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "test\n\ntest".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn null_character() {
let input = "data:\0\n\n\n\n";
let mut messages = decode(input.as_bytes());
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "\0".into()
})
);
assert!(messages.next().is_none());
}
#[test]
fn utf_8() {
let input = b"data:ok\xE2\x80\xA6\n\n";
let mut messages = decode(input);
assert_eq!(
messages.next().map(Result::unwrap),
Some(Event::Message {
id: None,
event: "message".into(),
data: "ok…".into()
})
);
assert!(messages.next().is_none());
}
}