1#![deny(missing_debug_implementations)]
7extern crate bytes;
8
9use std::error;
10use std::fmt;
11use std::io::{Cursor, Read, Result as IoResult};
12
13use bytes::{Buf, BufMut};
14
15#[derive(Debug)]
17pub struct InputBuffer(Cursor<Vec<u8>>);
18
19pub const MIN_READ: usize = 4096;
21
22impl InputBuffer {
23 pub fn new() -> Self {
25 Self::with_capacity(MIN_READ)
26 }
27
28 pub fn with_capacity(capacity: usize) -> Self {
30 Self::from_partially_read(Vec::with_capacity(capacity))
31 }
32
33 pub fn from_partially_read(part: Vec<u8>) -> Self {
35 InputBuffer(Cursor::new(part))
36 }
37
38 pub fn as_cursor(&self) -> &Cursor<Vec<u8>> {
40 &self.0
41 }
42
43 pub fn as_cursor_mut(&mut self) -> &mut Cursor<Vec<u8>> {
45 &mut self.0
46 }
47
48 pub fn remove_garbage(&mut self) {
50 let pos = self.0.position() as usize;
51 self.0.get_mut().drain(0..pos).count();
52 self.0.set_position(0);
53 }
54
55 pub fn into_vec(mut self) -> Vec<u8> {
57 self.remove_garbage();
58 self.0.into_inner()
59 }
60
61 pub fn read_from<S: Read>(&mut self, stream: &mut S) -> IoResult<usize> {
63 self.prepare().read_from(stream)
64 }
65
66 pub fn prepare<'t>(&'t mut self) -> DoRead<'t> {
68 self.prepare_reserve(MIN_READ)
69 }
70
71 pub fn prepare_reserve<'t>(&'t mut self, reserve: usize) -> DoRead<'t> {
73 let free_space = self.total_len() - self.filled_len();
75 let total_space = free_space + self.consumed_len();
77 let remove_garbage = free_space < reserve && total_space >= reserve;
79
80 DoRead {
81 buf: self,
82 remove_garbage,
83 reserve,
84 }
85 }
86}
87
88impl InputBuffer {
89 fn total_len(&self) -> usize {
91 self.0.get_ref().capacity()
92 }
93
94 fn filled_len(&self) -> usize {
96 self.0.get_ref().len()
97 }
98
99 fn consumed_len(&self) -> usize {
101 self.0.position() as usize
102 }
103}
104
105impl Buf for InputBuffer {
106 fn remaining(&self) -> usize {
107 Buf::remaining(self.as_cursor())
108 }
109 fn chunk(&self) -> &[u8] {
110 Buf::chunk(self.as_cursor())
111 }
112 fn advance(&mut self, size: usize) {
113 Buf::advance(self.as_cursor_mut(), size)
114 }
115}
116
117#[derive(Debug)]
119pub struct DoRead<'t> {
120 buf: &'t mut InputBuffer,
121 remove_garbage: bool,
122 reserve: usize,
123}
124
125impl<'t> DoRead<'t> {
126 pub fn with_limit(mut self, limit: usize) -> Result<Self, SizeLimit> {
128 let total_len = self.buf.filled_len() + self.reserve;
130 let consumed_len = self.buf.consumed_len();
132 if total_len - consumed_len <= limit {
134 if total_len > limit {
136 self.remove_garbage = true;
137 }
138 Ok(self)
139 } else {
140 Err(SizeLimit)
141 }
142 }
143
144 pub fn read_from<S: Read>(self, stream: &mut S) -> IoResult<usize> {
146 if self.remove_garbage {
147 self.buf.remove_garbage();
148 }
149
150 let v: &mut Vec<u8> = self.buf.0.get_mut();
151
152 v.reserve(self.reserve);
153
154 assert!(v.capacity() > v.len());
155 let size = unsafe {
156 let data = &mut v.chunk_mut()[..self.reserve];
159 {
166 let memory_to_zero = data.as_mut_ptr();
167 for i in 0..data.len() {
168 memory_to_zero.add(i).write(0);
169 }
170 }
171 let data = std::slice::from_raw_parts_mut(data.as_mut_ptr() as *mut u8, data.len());
173 let size = stream.read(data)?;
174 v.advance_mut(size);
175 size
176 };
177 Ok(size)
178 }
179}
180
181#[derive(Debug, Clone, Copy)]
183pub struct SizeLimit;
184
185impl fmt::Display for SizeLimit {
186 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
187 write!(f, "SizeLimit")
188 }
189}
190
191impl error::Error for SizeLimit {
192 fn description(&self) -> &'static str {
193 "Size limit exceeded"
194 }
195}
196
197#[cfg(test)]
198mod tests {
199
200 use super::InputBuffer;
201 use bytes::Buf;
202 use std::io::Cursor;
203
204 #[test]
205 fn simple_reading() {
206 let mut inp = Cursor::new(b"Hello World!".to_vec());
207 let mut buf = InputBuffer::new();
208 let size = buf.read_from(&mut inp).unwrap();
209 assert_eq!(size, 12);
210 assert_eq!(buf.chunk(), b"Hello World!");
211 }
212
213 #[test]
214 fn partial_reading() {
215 let mut inp = Cursor::new(b"Hello World!".to_vec());
216 let mut buf = InputBuffer::with_capacity(4);
217 let size = buf.prepare_reserve(4).read_from(&mut inp).unwrap();
218 assert_eq!(size, 4);
219 assert_eq!(buf.chunk(), b"Hell");
220 buf.advance(2);
221 assert_eq!(buf.chunk(), b"ll");
222 let size = buf.prepare_reserve(1).read_from(&mut inp).unwrap();
223 assert_eq!(size, 1);
224 assert_eq!(buf.chunk(), b"llo");
225 let size = buf.prepare_reserve(4).read_from(&mut inp).unwrap();
226 assert_eq!(size, 4);
227 assert_eq!(buf.chunk(), b"llo Wor");
228 let size = buf.prepare_reserve(16).read_from(&mut inp).unwrap();
229 assert_eq!(size, 3);
230 assert_eq!(buf.chunk(), b"llo World!");
231 }
232
233 #[test]
234 fn limiting() {
235 let mut inp = Cursor::new(b"Hello World!".to_vec());
236 let mut buf = InputBuffer::with_capacity(4);
237 let size = buf
238 .prepare_reserve(4)
239 .with_limit(5)
240 .unwrap()
241 .read_from(&mut inp)
242 .unwrap();
243 assert_eq!(size, 4);
244 assert_eq!(buf.chunk(), b"Hell");
245 buf.advance(2);
246 assert_eq!(buf.chunk(), b"ll");
247 {
248 let e = buf.prepare_reserve(4).with_limit(5);
249 assert!(e.is_err());
250 }
251 buf.advance(1);
252 let size = buf
253 .prepare_reserve(4)
254 .with_limit(5)
255 .unwrap()
256 .read_from(&mut inp)
257 .unwrap();
258 assert_eq!(size, 4);
259 assert_eq!(buf.chunk(), b"lo Wo");
260 }
261}