tokio_codec/lines_codec.rs
1use bytes::{BufMut, BytesMut};
2use std::{cmp, io, str, usize};
3use tokio_io::_tokio_codec::{Decoder, Encoder};
4
5/// A simple `Codec` implementation that splits up data into lines.
6#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
7pub struct LinesCodec {
8 // Stored index of the next index to examine for a `\n` character.
9 // This is used to optimize searching.
10 // For example, if `decode` was called with `abc`, it would hold `3`,
11 // because that is the next index to examine.
12 // The next time `decode` is called with `abcde\n`, the method will
13 // only look at `de\n` before returning.
14 next_index: usize,
15
16 /// The maximum length for a given line. If `usize::MAX`, lines will be
17 /// read until a `\n` character is reached.
18 max_length: usize,
19
20 /// Are we currently discarding the remainder of a line which was over
21 /// the length limit?
22 is_discarding: bool,
23}
24
25impl LinesCodec {
26 /// Returns a `LinesCodec` for splitting up data into lines.
27 ///
28 /// # Note
29 ///
30 /// The returned `LinesCodec` will not have an upper bound on the length
31 /// of a buffered line. See the documentation for [`new_with_max_length`]
32 /// for information on why this could be a potential security risk.
33 ///
34 /// [`new_with_max_length`]: #method.new_with_max_length
35 pub fn new() -> LinesCodec {
36 LinesCodec {
37 next_index: 0,
38 max_length: usize::MAX,
39 is_discarding: false,
40 }
41 }
42
43 /// Returns a `LinesCodec` with a maximum line length limit.
44 ///
45 /// If this is set, calls to `LinesCodec::decode` will return a
46 /// [`LengthError`] when a line exceeds the length limit. Subsequent calls
47 /// will discard up to `limit` bytes from that line until a newline
48 /// character is reached, returning `None` until the line over the limit
49 /// has been fully discarded. After that point, calls to `decode` will
50 /// function as normal.
51 ///
52 /// # Note
53 ///
54 /// Setting a length limit is highly recommended for any `LinesCodec` which
55 /// will be exposed to untrusted input. Otherwise, the size of the buffer
56 /// that holds the line currently being read is unbounded. An attacker could
57 /// exploit this unbounded buffer by sending an unbounded amount of input
58 /// without any `\n` characters, causing unbounded memory consumption.
59 ///
60 /// [`LengthError`]: ../struct.LengthError
61 pub fn new_with_max_length(max_length: usize) -> Self {
62 LinesCodec {
63 max_length,
64 ..LinesCodec::new()
65 }
66 }
67
68 /// Returns the maximum line length when decoding.
69 ///
70 /// ```
71 /// use std::usize;
72 /// use tokio_codec::LinesCodec;
73 ///
74 /// let codec = LinesCodec::new();
75 /// assert_eq!(codec.max_length(), usize::MAX);
76 /// ```
77 /// ```
78 /// use tokio_codec::LinesCodec;
79 ///
80 /// let codec = LinesCodec::new_with_max_length(256);
81 /// assert_eq!(codec.max_length(), 256);
82 /// ```
83 pub fn max_length(&self) -> usize {
84 self.max_length
85 }
86
87 fn discard(&mut self, newline_offset: Option<usize>, read_to: usize, buf: &mut BytesMut) {
88 let discard_to = if let Some(offset) = newline_offset {
89 // If we found a newline, discard up to that offset and
90 // then stop discarding. On the next iteration, we'll try
91 // to read a line normally.
92 self.is_discarding = false;
93 offset + self.next_index + 1
94 } else {
95 // Otherwise, we didn't find a newline, so we'll discard
96 // everything we read. On the next iteration, we'll continue
97 // discarding up to max_len bytes unless we find a newline.
98 read_to
99 };
100 buf.advance(discard_to);
101 self.next_index = 0;
102 }
103}
104
105fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
106 str::from_utf8(buf)
107 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8"))
108}
109
110fn without_carriage_return(s: &[u8]) -> &[u8] {
111 if let Some(&b'\r') = s.last() {
112 &s[..s.len() - 1]
113 } else {
114 s
115 }
116}
117
118impl Decoder for LinesCodec {
119 type Item = String;
120 // TODO: in the next breaking change, this should be changed to a custom
121 // error type that indicates the "max length exceeded" condition better.
122 type Error = io::Error;
123
124 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
125 loop {
126 // Determine how far into the buffer we'll search for a newline. If
127 // there's no max_length set, we'll read to the end of the buffer.
128 let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
129
130 let newline_offset = buf[self.next_index..read_to]
131 .iter()
132 .position(|b| *b == b'\n');
133
134 if self.is_discarding {
135 self.discard(newline_offset, read_to, buf);
136 } else {
137 return if let Some(offset) = newline_offset {
138 // Found a line!
139 let newline_index = offset + self.next_index;
140 self.next_index = 0;
141 let line = buf.split_to(newline_index + 1);
142 let line = &line[..line.len() - 1];
143 let line = without_carriage_return(line);
144 let line = utf8(line)?;
145
146 Ok(Some(line.to_string()))
147 } else if buf.len() > self.max_length {
148 // Reached the maximum length without finding a
149 // newline, return an error and start discarding on the
150 // next call.
151 self.is_discarding = true;
152 Err(io::Error::new(
153 io::ErrorKind::Other,
154 "line length limit exceeded",
155 ))
156 } else {
157 // We didn't find a line or reach the length limit, so the next
158 // call will resume searching at the current offset.
159 self.next_index = read_to;
160 Ok(None)
161 };
162 }
163 }
164 }
165
166 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
167 Ok(match self.decode(buf)? {
168 Some(frame) => Some(frame),
169 None => {
170 // No terminating newline - return remaining data, if any
171 if buf.is_empty() || buf == &b"\r"[..] {
172 None
173 } else {
174 let line = buf.take();
175 let line = without_carriage_return(&line);
176 let line = utf8(line)?;
177 self.next_index = 0;
178 Some(line.to_string())
179 }
180 }
181 })
182 }
183}
184
185impl Encoder for LinesCodec {
186 type Item = String;
187 type Error = io::Error;
188
189 fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
190 buf.reserve(line.len() + 1);
191 buf.put(line);
192 buf.put_u8(b'\n');
193 Ok(())
194 }
195}