input_buffer/
lib.rs

1//! A buffer for reading data from the network.
2//!
3//! The `InputBuffer` is a buffer of bytes similar to a first-in, first-out queue.
4//! It is filled by reading from a stream supporting `Read` and is then
5//! accessible as a cursor for reading bytes.
6#![deny(missing_debug_implementations)]
7extern crate bytes;
8
9use std::error;
10use std::fmt;
11use std::io::{Cursor, Read, Result as IoResult};
12
13use bytes::{Buf, BufMut};
14
15/// A FIFO buffer for reading packets from network.
16#[derive(Debug)]
17pub struct InputBuffer(Cursor<Vec<u8>>);
18
19/// The recommended minimum read size.
20pub const MIN_READ: usize = 4096;
21
22impl InputBuffer {
23    /// Create a new empty input buffer.
24    pub fn new() -> Self {
25        Self::with_capacity(MIN_READ)
26    }
27
28    /// Create a new empty input buffer.
29    pub fn with_capacity(capacity: usize) -> Self {
30        Self::from_partially_read(Vec::with_capacity(capacity))
31    }
32
33    /// Create a input buffer filled with previously read data.
34    pub fn from_partially_read(part: Vec<u8>) -> Self {
35        InputBuffer(Cursor::new(part))
36    }
37
38    /// Get the data as a cursor.
39    pub fn as_cursor(&self) -> &Cursor<Vec<u8>> {
40        &self.0
41    }
42
43    /// Get the data as a mutable cursor.
44    pub fn as_cursor_mut(&mut self) -> &mut Cursor<Vec<u8>> {
45        &mut self.0
46    }
47
48    /// Remove the already consumed portion of the data.
49    pub fn remove_garbage(&mut self) {
50        let pos = self.0.position() as usize;
51        self.0.get_mut().drain(0..pos).count();
52        self.0.set_position(0);
53    }
54
55    /// Get the rest of the buffer and destroy the buffer.
56    pub fn into_vec(mut self) -> Vec<u8> {
57        self.remove_garbage();
58        self.0.into_inner()
59    }
60
61    /// Read next portion of data from the given input stream.
62    pub fn read_from<S: Read>(&mut self, stream: &mut S) -> IoResult<usize> {
63        self.prepare().read_from(stream)
64    }
65
66    /// Prepare reading.
67    pub fn prepare<'t>(&'t mut self) -> DoRead<'t> {
68        self.prepare_reserve(MIN_READ)
69    }
70
71    /// Prepare reading with the given reserve.
72    pub fn prepare_reserve<'t>(&'t mut self, reserve: usize) -> DoRead<'t> {
73        // Space that we have right now.
74        let free_space = self.total_len() - self.filled_len();
75        // Space that we could have after garbage collect.
76        let total_space = free_space + self.consumed_len();
77        // If garbage collect would help, schedule it.
78        let remove_garbage = free_space < reserve && total_space >= reserve;
79
80        DoRead {
81            buf: self,
82            remove_garbage,
83            reserve,
84        }
85    }
86}
87
88impl InputBuffer {
89    /// Get the total buffer length.
90    fn total_len(&self) -> usize {
91        self.0.get_ref().capacity()
92    }
93
94    /// Get the filled buffer length.
95    fn filled_len(&self) -> usize {
96        self.0.get_ref().len()
97    }
98
99    /// Get the consumed data length.
100    fn consumed_len(&self) -> usize {
101        self.0.position() as usize
102    }
103}
104
105impl Buf for InputBuffer {
106    fn remaining(&self) -> usize {
107        Buf::remaining(self.as_cursor())
108    }
109    fn chunk(&self) -> &[u8] {
110        Buf::chunk(self.as_cursor())
111    }
112    fn advance(&mut self, size: usize) {
113        Buf::advance(self.as_cursor_mut(), size)
114    }
115}
116
117/// The reference to the buffer used for reading.
118#[derive(Debug)]
119pub struct DoRead<'t> {
120    buf: &'t mut InputBuffer,
121    remove_garbage: bool,
122    reserve: usize,
123}
124
125impl<'t> DoRead<'t> {
126    /// Enforce the size limit.
127    pub fn with_limit(mut self, limit: usize) -> Result<Self, SizeLimit> {
128        // Total size we shall have after reserve.
129        let total_len = self.buf.filled_len() + self.reserve;
130        // Size we could free if we collect garbage.
131        let consumed_len = self.buf.consumed_len();
132        // Shall we fit if we remove data already consumed?
133        if total_len - consumed_len <= limit {
134            // Shall we not fit if we don't remove data already consumed?
135            if total_len > limit {
136                self.remove_garbage = true;
137            }
138            Ok(self)
139        } else {
140            Err(SizeLimit)
141        }
142    }
143
144    /// Read next portion of data from the given input stream.
145    pub fn read_from<S: Read>(self, stream: &mut S) -> IoResult<usize> {
146        if self.remove_garbage {
147            self.buf.remove_garbage();
148        }
149
150        let v: &mut Vec<u8> = self.buf.0.get_mut();
151
152        v.reserve(self.reserve);
153
154        assert!(v.capacity() > v.len());
155        let size = unsafe {
156            // TODO: This can be replaced by std::mem::MaybeUninit::first_ptr_mut() once
157            // it is stabilized.
158            let data = &mut v.chunk_mut()[..self.reserve];
159            // We first have to initialize the data or otherwise casting to a byte slice
160            // below is UB. See also code of std::io::copy(), tokio::AsyncRead::poll_read_buf()
161            // and others.
162            //
163            // Read::read() might read uninitialized data otherwise, and generally creating
164            // references to uninitialized data is UB.
165            {
166                let memory_to_zero = data.as_mut_ptr();
167                for i in 0..data.len() {
168                    memory_to_zero.add(i).write(0);
169                }
170            }
171            // Now it's safe to cast it to a byte slice
172            let data = std::slice::from_raw_parts_mut(data.as_mut_ptr() as *mut u8, data.len());
173            let size = stream.read(data)?;
174            v.advance_mut(size);
175            size
176        };
177        Ok(size)
178    }
179}
180
181/// Size limit error.
182#[derive(Debug, Clone, Copy)]
183pub struct SizeLimit;
184
185impl fmt::Display for SizeLimit {
186    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
187        write!(f, "SizeLimit")
188    }
189}
190
191impl error::Error for SizeLimit {
192    fn description(&self) -> &'static str {
193        "Size limit exceeded"
194    }
195}
196
197#[cfg(test)]
198mod tests {
199
200    use super::InputBuffer;
201    use bytes::Buf;
202    use std::io::Cursor;
203
204    #[test]
205    fn simple_reading() {
206        let mut inp = Cursor::new(b"Hello World!".to_vec());
207        let mut buf = InputBuffer::new();
208        let size = buf.read_from(&mut inp).unwrap();
209        assert_eq!(size, 12);
210        assert_eq!(buf.chunk(), b"Hello World!");
211    }
212
213    #[test]
214    fn partial_reading() {
215        let mut inp = Cursor::new(b"Hello World!".to_vec());
216        let mut buf = InputBuffer::with_capacity(4);
217        let size = buf.prepare_reserve(4).read_from(&mut inp).unwrap();
218        assert_eq!(size, 4);
219        assert_eq!(buf.chunk(), b"Hell");
220        buf.advance(2);
221        assert_eq!(buf.chunk(), b"ll");
222        let size = buf.prepare_reserve(1).read_from(&mut inp).unwrap();
223        assert_eq!(size, 1);
224        assert_eq!(buf.chunk(), b"llo");
225        let size = buf.prepare_reserve(4).read_from(&mut inp).unwrap();
226        assert_eq!(size, 4);
227        assert_eq!(buf.chunk(), b"llo Wor");
228        let size = buf.prepare_reserve(16).read_from(&mut inp).unwrap();
229        assert_eq!(size, 3);
230        assert_eq!(buf.chunk(), b"llo World!");
231    }
232
233    #[test]
234    fn limiting() {
235        let mut inp = Cursor::new(b"Hello World!".to_vec());
236        let mut buf = InputBuffer::with_capacity(4);
237        let size = buf
238            .prepare_reserve(4)
239            .with_limit(5)
240            .unwrap()
241            .read_from(&mut inp)
242            .unwrap();
243        assert_eq!(size, 4);
244        assert_eq!(buf.chunk(), b"Hell");
245        buf.advance(2);
246        assert_eq!(buf.chunk(), b"ll");
247        {
248            let e = buf.prepare_reserve(4).with_limit(5);
249            assert!(e.is_err());
250        }
251        buf.advance(1);
252        let size = buf
253            .prepare_reserve(4)
254            .with_limit(5)
255            .unwrap()
256            .read_from(&mut inp)
257            .unwrap();
258        assert_eq!(size, 4);
259        assert_eq!(buf.chunk(), b"lo Wo");
260    }
261}