async_sse/
decoder.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
use crate::Lines;
use futures_lite::prelude::*;
use futures_lite::ready;
use std::task::{Context, Poll};

use std::pin::Pin;

use crate::Event;

/// Decode a new incoming SSE connection.
pub fn decode<R>(reader: R) -> Decoder<R>
where
    R: AsyncBufRead + Unpin,
{
    Decoder {
        lines: Lines::new(reader),
        processed_bom: false,
        buffer: vec![],
        last_event_id: None,
        event_type: None,
        data: vec![],
    }
}

/// An SSE protocol decoder.
#[derive(Debug)]
pub struct Decoder<R: AsyncBufRead + Unpin> {
    /// The lines decoder.
    lines: Lines<R>,
    /// Have we processed the optional Byte Order Marker on the first line?
    processed_bom: bool,
    /// Was the last character of the previous line a \r?
    /// Bytes that were fed to the decoder but do not yet form a message.
    buffer: Vec<u8>,
    /// The _last event ID_ buffer.
    last_event_id: Option<String>,
    /// The _event type_ buffer.
    event_type: Option<String>,
    /// The _data_ buffer.
    data: Vec<u8>,
}

impl<R: AsyncBufRead + Unpin> Decoder<R> {
    fn take_message(&mut self) -> Option<Event> {
        if self.data.is_empty() {
            // If the data buffer is an empty string, set the data buffer and
            // the event type buffer to the empty string [and return.]
            self.event_type.take();
            None
        } else {
            // Removing tailing newlines
            if self.data.ends_with(&[b'\n']) {
                self.data.pop();
            }
            let name = self.event_type.take().unwrap_or("message".to_string());
            let data = std::mem::replace(&mut self.data, vec![]);
            // The _last event ID_ buffer persists between messages.
            let id = self.last_event_id.clone();
            Some(Event::new_msg(name, data, id))
        }
    }
}

impl<R: AsyncBufRead + Unpin> Stream for Decoder<R> {
    type Item = http_types::Result<Event>;

    // This function uses two loops: one to get lines from the reader.
    // And one to parse each line delimited by `:`.
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            // Get the next line, if available.
            let line = match ready!(Pin::new(&mut self.lines).poll_next(cx)) {
                None => return Poll::Ready(None),
                Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
                Some(Ok(line)) => line,
            };

            // Get rid of the BOM at the start
            let line = if !self.processed_bom && line.starts_with("\u{feff}") {
                self.processed_bom = true;
                &line[3..]
            } else {
                &line
            };

            log::trace!("> new line: {:?}", line);
            let mut parts = line.splitn(2, ':');
            loop {
                match (parts.next(), parts.next()) {
                    // If the field name is "retry":
                    (Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
                        log::trace!("> retry");
                        // If the field value consists of only ASCII digits, then interpret the field value
                        // as an integer in base ten, and set the event stream's reconnection time to that
                        // integer. Otherwise, ignore the field.
                        if let Ok(time) = value.parse::<u64>() {
                            return Poll::Ready(Some(Ok(Event::new_retry(time))));
                        }
                    }
                    // If the field name is "event":
                    (Some("event"), Some(value)) => {
                        log::trace!("> event");
                        // Set the event type buffer to field value.
                        self.event_type = Some(strip_leading_space(value).to_string());
                    }
                    // If the field name is "data":
                    (Some("data"), value) => {
                        log::trace!("> data: {:?}", &value);
                        // Append the field value to the data buffer,
                        if let Some(value) = value {
                            self.data.extend(strip_leading_space_b(value.as_bytes()));
                            // then append a single U+000A LINE FEED (LF) character to the data buffer.
                        }
                        self.data.push(b'\n');
                    }
                    // If the field name is "id":
                    (Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
                        log::trace!("> id");
                        // If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value.
                        // Otherwise, ignore the field.
                        self.last_event_id = Some(strip_leading_space(id_str).to_string());
                        // return Poll::Ready(Ok(self.take_message()).transpose());
                    }
                    // Comment
                    (Some(""), Some(_)) => (log::trace!("> comment")),
                    // End of frame
                    (Some(""), None) => {
                        log::trace!("> end of frame");
                        match self.take_message() {
                            Some(event) => {
                                log::trace!("> end of frame [event]: {:?}", event);
                                return Poll::Ready(Some(Ok(event)));
                            }
                            None => {
                                log::trace!("> end of frame, break");
                                break;
                            }
                        };
                    }
                    (_, _) => {
                        break;
                    }
                };
            }
        }
    }
}

/// Remove a leading space (code point 0x20) from a string slice.
fn strip_leading_space(input: &str) -> &str {
    if input.starts_with(' ') {
        &input[1..]
    } else {
        input
    }
}

fn strip_leading_space_b(input: &[u8]) -> &[u8] {
    if input.starts_with(&[b' ']) {
        &input[1..]
    } else {
        input
    }
}