tokio_util/codec/
lines_codec.rs

1use crate::codec::decoder::Decoder;
2use crate::codec::encoder::Encoder;
3
4use bytes::{Buf, BufMut, BytesMut};
5use std::{cmp, fmt, io, str};
6
7/// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines.
8///
9/// This uses the `\n` character as the line ending on all platforms.
10///
11/// [`Decoder`]: crate::codec::Decoder
12/// [`Encoder`]: crate::codec::Encoder
13#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
14pub struct LinesCodec {
15    // Stored index of the next index to examine for a `\n` character.
16    // This is used to optimize searching.
17    // For example, if `decode` was called with `abc`, it would hold `3`,
18    // because that is the next index to examine.
19    // The next time `decode` is called with `abcde\n`, the method will
20    // only look at `de\n` before returning.
21    next_index: usize,
22
23    /// The maximum length for a given line. If `usize::MAX`, lines will be
24    /// read until a `\n` character is reached.
25    max_length: usize,
26
27    /// Are we currently discarding the remainder of a line which was over
28    /// the length limit?
29    is_discarding: bool,
30}
31
32impl LinesCodec {
33    /// Returns a `LinesCodec` for splitting up data into lines.
34    ///
35    /// # Note
36    ///
37    /// The returned `LinesCodec` will not have an upper bound on the length
38    /// of a buffered line. See the documentation for [`new_with_max_length`]
39    /// for information on why this could be a potential security risk.
40    ///
41    /// [`new_with_max_length`]: crate::codec::LinesCodec::new_with_max_length()
42    pub fn new() -> LinesCodec {
43        LinesCodec {
44            next_index: 0,
45            max_length: usize::MAX,
46            is_discarding: false,
47        }
48    }
49
50    /// Returns a `LinesCodec` with a maximum line length limit.
51    ///
52    /// If this is set, calls to `LinesCodec::decode` will return a
53    /// [`LinesCodecError`] when a line exceeds the length limit. Subsequent calls
54    /// will discard up to `limit` bytes from that line until a newline
55    /// character is reached, returning `None` until the line over the limit
56    /// has been fully discarded. After that point, calls to `decode` will
57    /// function as normal.
58    ///
59    /// # Note
60    ///
61    /// Setting a length limit is highly recommended for any `LinesCodec` which
62    /// will be exposed to untrusted input. Otherwise, the size of the buffer
63    /// that holds the line currently being read is unbounded. An attacker could
64    /// exploit this unbounded buffer by sending an unbounded amount of input
65    /// without any `\n` characters, causing unbounded memory consumption.
66    ///
67    /// [`LinesCodecError`]: crate::codec::LinesCodecError
68    pub fn new_with_max_length(max_length: usize) -> Self {
69        LinesCodec {
70            max_length,
71            ..LinesCodec::new()
72        }
73    }
74
75    /// Returns the maximum line length when decoding.
76    ///
77    /// ```
78    /// use std::usize;
79    /// use tokio_util::codec::LinesCodec;
80    ///
81    /// let codec = LinesCodec::new();
82    /// assert_eq!(codec.max_length(), usize::MAX);
83    /// ```
84    /// ```
85    /// use tokio_util::codec::LinesCodec;
86    ///
87    /// let codec = LinesCodec::new_with_max_length(256);
88    /// assert_eq!(codec.max_length(), 256);
89    /// ```
90    pub fn max_length(&self) -> usize {
91        self.max_length
92    }
93}
94
95fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
96    str::from_utf8(buf)
97        .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8"))
98}
99
100fn without_carriage_return(s: &[u8]) -> &[u8] {
101    if let Some(&b'\r') = s.last() {
102        &s[..s.len() - 1]
103    } else {
104        s
105    }
106}
107
108impl Decoder for LinesCodec {
109    type Item = String;
110    type Error = LinesCodecError;
111
112    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> {
113        loop {
114            // Determine how far into the buffer we'll search for a newline. If
115            // there's no max_length set, we'll read to the end of the buffer.
116            let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
117
118            let newline_offset = buf[self.next_index..read_to]
119                .iter()
120                .position(|b| *b == b'\n');
121
122            match (self.is_discarding, newline_offset) {
123                (true, Some(offset)) => {
124                    // If we found a newline, discard up to that offset and
125                    // then stop discarding. On the next iteration, we'll try
126                    // to read a line normally.
127                    buf.advance(offset + self.next_index + 1);
128                    self.is_discarding = false;
129                    self.next_index = 0;
130                }
131                (true, None) => {
132                    // Otherwise, we didn't find a newline, so we'll discard
133                    // everything we read. On the next iteration, we'll continue
134                    // discarding up to max_len bytes unless we find a newline.
135                    buf.advance(read_to);
136                    self.next_index = 0;
137                    if buf.is_empty() {
138                        return Ok(None);
139                    }
140                }
141                (false, Some(offset)) => {
142                    // Found a line!
143                    let newline_index = offset + self.next_index;
144                    self.next_index = 0;
145                    let line = buf.split_to(newline_index + 1);
146                    let line = &line[..line.len() - 1];
147                    let line = without_carriage_return(line);
148                    let line = utf8(line)?;
149                    return Ok(Some(line.to_string()));
150                }
151                (false, None) if buf.len() > self.max_length => {
152                    // Reached the maximum length without finding a
153                    // newline, return an error and start discarding on the
154                    // next call.
155                    self.is_discarding = true;
156                    return Err(LinesCodecError::MaxLineLengthExceeded);
157                }
158                (false, None) => {
159                    // We didn't find a line or reach the length limit, so the next
160                    // call will resume searching at the current offset.
161                    self.next_index = read_to;
162                    return Ok(None);
163                }
164            }
165        }
166    }
167
168    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> {
169        Ok(match self.decode(buf)? {
170            Some(frame) => Some(frame),
171            None => {
172                self.next_index = 0;
173                // No terminating newline - return remaining data, if any
174                if buf.is_empty() || buf == &b"\r"[..] {
175                    None
176                } else {
177                    let line = buf.split_to(buf.len());
178                    let line = without_carriage_return(&line);
179                    let line = utf8(line)?;
180                    Some(line.to_string())
181                }
182            }
183        })
184    }
185}
186
187impl<T> Encoder<T> for LinesCodec
188where
189    T: AsRef<str>,
190{
191    type Error = LinesCodecError;
192
193    fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> {
194        let line = line.as_ref();
195        buf.reserve(line.len() + 1);
196        buf.put(line.as_bytes());
197        buf.put_u8(b'\n');
198        Ok(())
199    }
200}
201
202impl Default for LinesCodec {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208/// An error occurred while encoding or decoding a line.
209#[derive(Debug)]
210pub enum LinesCodecError {
211    /// The maximum line length was exceeded.
212    MaxLineLengthExceeded,
213    /// An IO error occurred.
214    Io(io::Error),
215}
216
217impl fmt::Display for LinesCodecError {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        match self {
220            LinesCodecError::MaxLineLengthExceeded => write!(f, "max line length exceeded"),
221            LinesCodecError::Io(e) => write!(f, "{e}"),
222        }
223    }
224}
225
226impl From<io::Error> for LinesCodecError {
227    fn from(e: io::Error) -> LinesCodecError {
228        LinesCodecError::Io(e)
229    }
230}
231
232impl std::error::Error for LinesCodecError {}