async_std/io/buf_reader.rs
1use std::io::{IoSliceMut, Read as _};
2use std::pin::Pin;
3use std::{cmp, fmt};
4
5use pin_project_lite::pin_project;
6
7use crate::io::{self, BufRead, Read, Seek, SeekFrom, DEFAULT_BUF_SIZE};
8use crate::task::{Context, Poll};
9
10pin_project! {
11 /// Adds buffering to any reader.
12 ///
13 /// It can be excessively inefficient to work directly with a [`Read`] instance. A `BufReader`
14 /// performs large, infrequent reads on the underlying [`Read`] and maintains an in-memory buffer
15 /// of the incoming byte stream.
16 ///
17 /// `BufReader` can improve the speed of programs that make *small* and *repeated* read calls to
18 /// the same file or network socket. It does not help when reading very large amounts at once, or
19 /// reading just one or a few times. It also provides no advantage when reading from a source that
20 /// is already in memory, like a `Vec<u8>`.
21 ///
22 /// When the `BufReader` is dropped, the contents of its buffer will be discarded. Creating
23 /// multiple instances of a `BufReader` on the same stream can cause data loss.
24 ///
25 /// This type is an async version of [`std::io::BufReader`].
26 ///
27 /// [`Read`]: trait.Read.html
28 /// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
29 ///
30 /// # Examples
31 ///
32 /// ```no_run
33 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
34 /// #
35 /// use async_std::fs::File;
36 /// use async_std::io::BufReader;
37 /// use async_std::prelude::*;
38 ///
39 /// let mut file = BufReader::new(File::open("a.txt").await?);
40 ///
41 /// let mut line = String::new();
42 /// file.read_line(&mut line).await?;
43 /// #
44 /// # Ok(()) }) }
45 /// ```
46 pub struct BufReader<R> {
47 #[pin]
48 inner: R,
49 buf: Box<[u8]>,
50 pos: usize,
51 cap: usize,
52 }
53}
54
55impl<R: io::Read> BufReader<R> {
56 /// Creates a buffered reader with default buffer capacity.
57 ///
58 /// The default capacity is currently 8 KB, but may change in the future.
59 ///
60 /// # Examples
61 ///
62 /// ```no_run
63 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
64 /// #
65 /// use async_std::fs::File;
66 /// use async_std::io::BufReader;
67 ///
68 /// let f = BufReader::new(File::open("a.txt").await?);
69 /// #
70 /// # Ok(()) }) }
71 /// ```
72 pub fn new(inner: R) -> BufReader<R> {
73 BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
74 }
75
76 /// Creates a new buffered reader with the specified capacity.
77 ///
78 /// # Examples
79 ///
80 /// ```no_run
81 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
82 /// #
83 /// use async_std::fs::File;
84 /// use async_std::io::BufReader;
85 ///
86 /// let f = BufReader::with_capacity(1024, File::open("a.txt").await?);
87 /// #
88 /// # Ok(()) }) }
89 /// ```
90 pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
91 BufReader {
92 inner,
93 buf: vec![0; capacity].into_boxed_slice(),
94 pos: 0,
95 cap: 0,
96 }
97 }
98}
99
100impl<R> BufReader<R> {
101 /// Gets a reference to the underlying reader.
102 ///
103 /// It is inadvisable to directly read from the underlying reader.
104 ///
105 /// # Examples
106 ///
107 /// ```no_run
108 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
109 /// #
110 /// use async_std::fs::File;
111 /// use async_std::io::BufReader;
112 ///
113 /// let f = BufReader::new(File::open("a.txt").await?);
114 /// let inner = f.get_ref();
115 /// #
116 /// # Ok(()) }) }
117 /// ```
118 pub fn get_ref(&self) -> &R {
119 &self.inner
120 }
121
122 /// Gets a mutable reference to the underlying reader.
123 ///
124 /// It is inadvisable to directly read from the underlying reader.
125 ///
126 /// # Examples
127 ///
128 /// ```no_run
129 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
130 /// #
131 /// use async_std::fs::File;
132 /// use async_std::io::BufReader;
133 ///
134 /// let mut file = BufReader::new(File::open("a.txt").await?);
135 /// let inner = file.get_mut();
136 /// #
137 /// # Ok(()) }) }
138 /// ```
139 pub fn get_mut(&mut self) -> &mut R {
140 &mut self.inner
141 }
142
143 /// Gets a pinned mutable reference to the underlying reader.
144 ///
145 /// It is inadvisable to directly read from the underlying reader.
146 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
147 self.project().inner
148 }
149
150 /// Returns a reference to the internal buffer.
151 ///
152 /// This function will not attempt to fill the buffer if it is empty.
153 ///
154 /// # Examples
155 ///
156 /// ```no_run
157 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
158 /// #
159 /// use async_std::fs::File;
160 /// use async_std::io::BufReader;
161 ///
162 /// let f = BufReader::new(File::open("a.txt").await?);
163 /// let buffer = f.buffer();
164 /// #
165 /// # Ok(()) }) }
166 /// ```
167 pub fn buffer(&self) -> &[u8] {
168 &self.buf[self.pos..self.cap]
169 }
170
171 /// Unwraps the buffered reader, returning the underlying reader.
172 ///
173 /// Note that any leftover data in the internal buffer is lost.
174 ///
175 /// # Examples
176 ///
177 /// ```no_run
178 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
179 /// #
180 /// use async_std::fs::File;
181 /// use async_std::io::BufReader;
182 ///
183 /// let f = BufReader::new(File::open("a.txt").await?);
184 /// let inner = f.into_inner();
185 /// #
186 /// # Ok(()) }) }
187 /// ```
188 pub fn into_inner(self) -> R {
189 self.inner
190 }
191
192 /// Invalidates all data in the internal buffer.
193 #[inline]
194 fn discard_buffer(self: Pin<&mut Self>) {
195 let this = self.project();
196 *this.pos = 0;
197 *this.cap = 0;
198 }
199}
200
201impl<R: Read> Read for BufReader<R> {
202 fn poll_read(
203 mut self: Pin<&mut Self>,
204 cx: &mut Context<'_>,
205 buf: &mut [u8],
206 ) -> Poll<io::Result<usize>> {
207 // If we don't have any buffered data and we're doing a massive read
208 // (larger than our internal buffer), bypass our internal buffer
209 // entirely.
210 if self.pos == self.cap && buf.len() >= self.buf.len() {
211 let res = futures_core::ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
212 self.discard_buffer();
213 return Poll::Ready(res);
214 }
215 let mut rem = futures_core::ready!(self.as_mut().poll_fill_buf(cx))?;
216 let nread = rem.read(buf)?;
217 self.consume(nread);
218 Poll::Ready(Ok(nread))
219 }
220
221 fn poll_read_vectored(
222 mut self: Pin<&mut Self>,
223 cx: &mut Context<'_>,
224 bufs: &mut [IoSliceMut<'_>],
225 ) -> Poll<io::Result<usize>> {
226 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
227 if self.pos == self.cap && total_len >= self.buf.len() {
228 let res =
229 futures_core::ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
230 self.discard_buffer();
231 return Poll::Ready(res);
232 }
233 let mut rem = futures_core::ready!(self.as_mut().poll_fill_buf(cx))?;
234 let nread = rem.read_vectored(bufs)?;
235 self.consume(nread);
236 Poll::Ready(Ok(nread))
237 }
238}
239
240impl<R: Read> BufRead for BufReader<R> {
241 fn poll_fill_buf<'a>(
242 self: Pin<&'a mut Self>,
243 cx: &mut Context<'_>,
244 ) -> Poll<io::Result<&'a [u8]>> {
245 let mut this = self.project();
246
247 // If we've reached the end of our internal buffer then we need to fetch
248 // some more data from the underlying reader.
249 // Branch using `>=` instead of the more correct `==`
250 // to tell the compiler that the pos..cap slice is always valid.
251 if *this.pos >= *this.cap {
252 debug_assert!(*this.pos == *this.cap);
253 *this.cap = futures_core::ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
254 *this.pos = 0;
255 }
256 Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
257 }
258
259 fn consume(self: Pin<&mut Self>, amt: usize) {
260 let this = self.project();
261 *this.pos = cmp::min(*this.pos + amt, *this.cap);
262 }
263}
264
265impl<R: io::Read + fmt::Debug> fmt::Debug for BufReader<R> {
266 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 f.debug_struct("BufReader")
268 .field("reader", &self.inner)
269 .field(
270 "buffer",
271 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
272 )
273 .finish()
274 }
275}
276
277impl<R: Seek> Seek for BufReader<R> {
278 /// Seeks to an offset, in bytes, in the underlying reader.
279 ///
280 /// The position used for seeking with `SeekFrom::Current(_)` is the position the underlying
281 /// reader would be at if the `BufReader` had no internal buffer.
282 ///
283 /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
284 /// within it. This guarantees that calling `.into_inner()` immediately after a seek yields the
285 /// underlying reader at the same position.
286 ///
287 /// See [`Seek`] for more details.
288 ///
289 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
290 /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
291 /// the second seek returns `Err`, the underlying reader will be left at the same position it
292 /// would have if you called `seek` with `SeekFrom::Current(0)`.
293 ///
294 /// [`Seek`]: trait.Seek.html
295 fn poll_seek(
296 mut self: Pin<&mut Self>,
297 cx: &mut Context<'_>,
298 pos: SeekFrom,
299 ) -> Poll<io::Result<u64>> {
300 let result: u64;
301 if let SeekFrom::Current(n) = pos {
302 let remainder = (self.cap - self.pos) as i64;
303 // it should be safe to assume that remainder fits within an i64 as the alternative
304 // means we managed to allocate 8 exbibytes and that's absurd.
305 // But it's not out of the realm of possibility for some weird underlying reader to
306 // support seeking by i64::min_value() so we need to handle underflow when subtracting
307 // remainder.
308 if let Some(offset) = n.checked_sub(remainder) {
309 result = futures_core::ready!(
310 self.as_mut()
311 .get_pin_mut()
312 .poll_seek(cx, SeekFrom::Current(offset))
313 )?;
314 } else {
315 // seek backwards by our remainder, and then by the offset
316 futures_core::ready!(
317 self.as_mut()
318 .get_pin_mut()
319 .poll_seek(cx, SeekFrom::Current(-remainder))
320 )?;
321 self.as_mut().discard_buffer();
322 result = futures_core::ready!(
323 self.as_mut()
324 .get_pin_mut()
325 .poll_seek(cx, SeekFrom::Current(n))
326 )?;
327 }
328 } else {
329 // Seeking with Start/End doesn't care about our buffer length.
330 result = futures_core::ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
331 }
332 self.discard_buffer();
333 Poll::Ready(Ok(result))
334 }
335}