tokio_codec/
lines_codec.rs

1use bytes::{BufMut, BytesMut};
2use std::{cmp, io, str, usize};
3use tokio_io::_tokio_codec::{Decoder, Encoder};
4
5/// A simple `Codec` implementation that splits up data into lines.
6#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
7pub struct LinesCodec {
8    // Stored index of the next index to examine for a `\n` character.
9    // This is used to optimize searching.
10    // For example, if `decode` was called with `abc`, it would hold `3`,
11    // because that is the next index to examine.
12    // The next time `decode` is called with `abcde\n`, the method will
13    // only look at `de\n` before returning.
14    next_index: usize,
15
16    /// The maximum length for a given line. If `usize::MAX`, lines will be
17    /// read until a `\n` character is reached.
18    max_length: usize,
19
20    /// Are we currently discarding the remainder of a line which was over
21    /// the length limit?
22    is_discarding: bool,
23}
24
25impl LinesCodec {
26    /// Returns a `LinesCodec` for splitting up data into lines.
27    ///
28    /// # Note
29    ///
30    /// The returned `LinesCodec` will not have an upper bound on the length
31    /// of a buffered line. See the documentation for [`new_with_max_length`]
32    /// for information on why this could be a potential security risk.
33    ///
34    /// [`new_with_max_length`]: #method.new_with_max_length
35    pub fn new() -> LinesCodec {
36        LinesCodec {
37            next_index: 0,
38            max_length: usize::MAX,
39            is_discarding: false,
40        }
41    }
42
43    /// Returns a `LinesCodec` with a maximum line length limit.
44    ///
45    /// If this is set, calls to `LinesCodec::decode` will return a
46    /// [`LengthError`] when a line exceeds the length limit. Subsequent calls
47    /// will discard up to `limit` bytes from that line until a newline
48    /// character is reached, returning `None` until the line over the limit
49    /// has been fully discarded. After that point, calls to `decode` will
50    /// function as normal.
51    ///
52    /// # Note
53    ///
54    /// Setting a length limit is highly recommended for any `LinesCodec` which
55    /// will be exposed to untrusted input. Otherwise, the size of the buffer
56    /// that holds the line currently being read is unbounded. An attacker could
57    /// exploit this unbounded buffer by sending an unbounded amount of input
58    /// without any `\n` characters, causing unbounded memory consumption.
59    ///
60    /// [`LengthError`]: ../struct.LengthError
61    pub fn new_with_max_length(max_length: usize) -> Self {
62        LinesCodec {
63            max_length,
64            ..LinesCodec::new()
65        }
66    }
67
68    /// Returns the maximum line length when decoding.
69    ///
70    /// ```
71    /// use std::usize;
72    /// use tokio_codec::LinesCodec;
73    ///
74    /// let codec = LinesCodec::new();
75    /// assert_eq!(codec.max_length(), usize::MAX);
76    /// ```
77    /// ```
78    /// use tokio_codec::LinesCodec;
79    ///
80    /// let codec = LinesCodec::new_with_max_length(256);
81    /// assert_eq!(codec.max_length(), 256);
82    /// ```
83    pub fn max_length(&self) -> usize {
84        self.max_length
85    }
86
87    fn discard(&mut self, newline_offset: Option<usize>, read_to: usize, buf: &mut BytesMut) {
88        let discard_to = if let Some(offset) = newline_offset {
89            // If we found a newline, discard up to that offset and
90            // then stop discarding. On the next iteration, we'll try
91            // to read a line normally.
92            self.is_discarding = false;
93            offset + self.next_index + 1
94        } else {
95            // Otherwise, we didn't find a newline, so we'll discard
96            // everything we read. On the next iteration, we'll continue
97            // discarding up to max_len bytes unless we find a newline.
98            read_to
99        };
100        buf.advance(discard_to);
101        self.next_index = 0;
102    }
103}
104
105fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
106    str::from_utf8(buf)
107        .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8"))
108}
109
110fn without_carriage_return(s: &[u8]) -> &[u8] {
111    if let Some(&b'\r') = s.last() {
112        &s[..s.len() - 1]
113    } else {
114        s
115    }
116}
117
118impl Decoder for LinesCodec {
119    type Item = String;
120    // TODO: in the next breaking change, this should be changed to a custom
121    // error type that indicates the "max length exceeded" condition better.
122    type Error = io::Error;
123
124    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
125        loop {
126            // Determine how far into the buffer we'll search for a newline. If
127            // there's no max_length set, we'll read to the end of the buffer.
128            let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
129
130            let newline_offset = buf[self.next_index..read_to]
131                .iter()
132                .position(|b| *b == b'\n');
133
134            if self.is_discarding {
135                self.discard(newline_offset, read_to, buf);
136            } else {
137                return if let Some(offset) = newline_offset {
138                    // Found a line!
139                    let newline_index = offset + self.next_index;
140                    self.next_index = 0;
141                    let line = buf.split_to(newline_index + 1);
142                    let line = &line[..line.len() - 1];
143                    let line = without_carriage_return(line);
144                    let line = utf8(line)?;
145
146                    Ok(Some(line.to_string()))
147                } else if buf.len() > self.max_length {
148                    // Reached the maximum length without finding a
149                    // newline, return an error and start discarding on the
150                    // next call.
151                    self.is_discarding = true;
152                    Err(io::Error::new(
153                        io::ErrorKind::Other,
154                        "line length limit exceeded",
155                    ))
156                } else {
157                    // We didn't find a line or reach the length limit, so the next
158                    // call will resume searching at the current offset.
159                    self.next_index = read_to;
160                    Ok(None)
161                };
162            }
163        }
164    }
165
166    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
167        Ok(match self.decode(buf)? {
168            Some(frame) => Some(frame),
169            None => {
170                // No terminating newline - return remaining data, if any
171                if buf.is_empty() || buf == &b"\r"[..] {
172                    None
173                } else {
174                    let line = buf.take();
175                    let line = without_carriage_return(&line);
176                    let line = utf8(line)?;
177                    self.next_index = 0;
178                    Some(line.to_string())
179                }
180            }
181        })
182    }
183}
184
185impl Encoder for LinesCodec {
186    type Item = String;
187    type Error = io::Error;
188
189    fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
190        buf.reserve(line.len() + 1);
191        buf.put(line);
192        buf.put_u8(b'\n');
193        Ok(())
194    }
195}