async_std/io/
buf_writer.rs

1use std::fmt;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::io::write::WriteExt;
7use crate::io::{self, Seek, SeekFrom, Write, DEFAULT_BUF_SIZE};
8use crate::task::{Context, Poll, ready};
9
10pin_project! {
11    /// Wraps a writer and buffers its output.
12    ///
13    /// It can be excessively inefficient to work directly with something that
14    /// implements [`Write`]. For example, every call to
15    /// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
16    /// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying
17    /// writer in large, infrequent batches.
18    ///
19    /// `BufWriter` can improve the speed of programs that make *small* and
20    /// *repeated* write calls to the same file or network socket. It does not
21    /// help when writing very large amounts at once, or writing just one or a few
22    /// times. It also provides no advantage when writing to a destination that is
23    /// in memory, like a `Vec<u8>`.
24    ///
25    /// Unlike the `BufWriter` type in `std`, this type does not write out the
26    /// contents of its buffer when it is dropped. Therefore, it is absolutely
27    /// critical that users explicitly flush the buffer before dropping a
28    /// `BufWriter`.
29    ///
30    /// This type is an async version of [`std::io::BufWriter`].
31    ///
32    /// [`std::io::BufWriter`]: https://doc.rust-lang.org/std/io/struct.BufWriter.html
33    ///
34    /// # Examples
35    ///
36    /// Let's write the numbers one through ten to a [`TcpStream`]:
37    ///
38    /// ```no_run
39    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
40    /// use async_std::net::TcpStream;
41    /// use async_std::prelude::*;
42    ///
43    /// let mut stream = TcpStream::connect("127.0.0.1:34254").await?;
44    ///
45    /// for i in 0..10 {
46    ///     let arr = [i+1];
47    ///     stream.write(&arr).await?;
48    /// }
49    /// #
50    /// # Ok(()) }) }
51    /// ```
52    ///
53    /// Because we're not buffering, we write each one in turn, incurring the
54    /// overhead of a system call per byte written. We can fix this with a
55    /// `BufWriter`:
56    ///
57    /// ```no_run
58    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
59    /// use async_std::io::BufWriter;
60    /// use async_std::net::TcpStream;
61    /// use async_std::prelude::*;
62    ///
63    /// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
64    ///
65    /// for i in 0..10 {
66    ///     let arr = [i+1];
67    ///     stream.write(&arr).await?;
68    /// };
69    ///
70    /// stream.flush().await?;
71    /// #
72    /// # Ok(()) }) }
73    /// ```
74    ///
75    /// By wrapping the stream with a `BufWriter`, these ten writes are all grouped
76    /// together by the buffer, and will all be written out in one system call when
77    /// `stream.flush()` completes. (As mentioned above, dropping a `BufWriter`
78    /// does not flush its buffers, so a `flush` call is essential.)
79    ///
80    /// [`Write`]: trait.Write.html
81    /// [`TcpStream::write`]: ../net/struct.TcpStream.html#method.write
82    /// [`TcpStream`]: ../net/struct.TcpStream.html
83    /// [`flush`]: trait.Write.html#tymethod.flush
84    pub struct BufWriter<W> {
85        #[pin]
86        inner: W,
87        buf: Vec<u8>,
88        written: usize,
89    }
90}
91
92/// An error returned by `into_inner` which combines an error that
93/// happened while writing out the buffer, and the buffered writer object
94/// which may be used to recover from the condition.
95///
96/// # Examples
97///
98/// ```no_run
99/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
100/// use async_std::io::BufWriter;
101/// use async_std::net::TcpStream;
102///
103/// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34251").await?);
104///
105/// // unwrap the TcpStream and flush the buffer
106/// let stream = match buf_writer.into_inner().await {
107///     Ok(s) => s,
108///     Err(e) => {
109///         // Here, e is an IntoInnerError
110///         panic!("An error occurred");
111///     }
112/// };
113/// #
114/// # Ok(()) }) }
115///```
116#[derive(Debug)]
117pub struct IntoInnerError<W>(W, #[allow(dead_code)] crate::io::Error);
118
119impl<W: Write> BufWriter<W> {
120    /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
121    /// but may change in the future.
122    ///
123    /// # Examples
124    ///
125    /// ```no_run
126    /// # #![allow(unused_mut)]
127    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
128    /// use async_std::io::BufWriter;
129    /// use async_std::net::TcpStream;
130    ///
131    /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
132    /// #
133    /// # Ok(()) }) }
134    /// ```
135    pub fn new(inner: W) -> BufWriter<W> {
136        BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
137    }
138
139    /// Creates a new `BufWriter` with the specified buffer capacity.
140    ///
141    /// # Examples
142    ///
143    /// Creating a buffer with a buffer of a hundred bytes.
144    ///
145    /// ```no_run
146    /// # #![allow(unused_mut)]
147    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
148    /// use async_std::io::BufWriter;
149    /// use async_std::net::TcpStream;
150    ///
151    /// let stream = TcpStream::connect("127.0.0.1:34254").await?;
152    /// let mut buffer = BufWriter::with_capacity(100, stream);
153    /// #
154    /// # Ok(()) }) }
155    /// ```
156    pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
157        BufWriter {
158            inner,
159            buf: Vec::with_capacity(capacity),
160            written: 0,
161        }
162    }
163
164    /// Gets a reference to the underlying writer.
165    ///
166    /// # Examples
167    ///
168    /// ```no_run
169    /// # #![allow(unused_mut)]
170    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
171    /// use async_std::io::BufWriter;
172    /// use async_std::net::TcpStream;
173    ///
174    /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
175    ///
176    /// // We can use reference just like buffer
177    /// let reference = buffer.get_ref();
178    /// #
179    /// # Ok(()) }) }
180    /// ```
181    pub fn get_ref(&self) -> &W {
182        &self.inner
183    }
184
185    /// Gets a mutable reference to the underlying writer.
186    ///
187    /// It is inadvisable to directly write to the underlying writer.
188    ///
189    /// # Examples
190    ///
191    /// ```no_run
192    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
193    /// use async_std::io::BufWriter;
194    /// use async_std::net::TcpStream;
195    ///
196    /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
197    ///
198    /// // We can use reference just like buffer
199    /// let reference = buffer.get_mut();
200    /// #
201    /// # Ok(()) }) }
202    /// ```
203    pub fn get_mut(&mut self) -> &mut W {
204        &mut self.inner
205    }
206
207    /// Gets a pinned mutable reference to the underlying writer.
208    ///
209    /// It is inadvisable to directly write to the underlying writer.
210    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
211        self.project().inner
212    }
213
214    /// Consumes BufWriter, returning the underlying writer
215    ///
216    /// This method will not write leftover data, it will be lost.
217    /// For method that will attempt to write before returning the writer see [`poll_into_inner`]
218    ///
219    /// [`poll_into_inner`]: #method.poll_into_inner
220    /// # Examples
221    ///
222    /// ```no_run
223    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
224    /// use async_std::io::BufWriter;
225    /// use async_std::net::TcpStream;
226    ///
227    /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34251").await?);
228    ///
229    /// // unwrap the TcpStream and flush the buffer
230    /// let stream = buf_writer.into_inner().await.unwrap();
231    /// #
232    /// # Ok(()) }) }
233    /// ```
234    pub async fn into_inner(mut self) -> Result<W, IntoInnerError<BufWriter<W>>>
235    where
236        Self: Unpin,
237    {
238        match self.flush().await {
239            Err(e) => Err(IntoInnerError(self, e)),
240            Ok(()) => Ok(self.inner),
241        }
242    }
243
244    /// Returns a reference to the internally buffered data.
245    ///
246    /// # Examples
247    ///
248    /// ```no_run
249    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
250    /// use async_std::io::BufWriter;
251    /// use async_std::net::TcpStream;
252    ///
253    /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34251").await?);
254    ///
255    /// // See how many bytes are currently buffered
256    /// let bytes_buffered = buf_writer.buffer().len();
257    /// #
258    /// # Ok(()) }) }
259    /// ```
260    pub fn buffer(&self) -> &[u8] {
261        &self.buf
262    }
263
264    /// Poll buffer flushing until completion
265    ///
266    /// This is used in types that wrap around BufWrite, one such example: [`LineWriter`]
267    ///
268    /// [`LineWriter`]: struct.LineWriter.html
269    fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
270        let mut this = self.project();
271        let len = this.buf.len();
272        let mut ret = Ok(());
273        while *this.written < len {
274            match this
275                .inner
276                .as_mut()
277                .poll_write(cx, &this.buf[*this.written..])
278            {
279                Poll::Ready(Ok(0)) => {
280                    ret = Err(io::Error::new(
281                        io::ErrorKind::WriteZero,
282                        "Failed to write buffered data",
283                    ));
284                    break;
285                }
286                Poll::Ready(Ok(n)) => *this.written += n,
287                Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::Interrupted => {}
288                Poll::Ready(Err(e)) => {
289                    ret = Err(e);
290                    break;
291                }
292                Poll::Pending => return Poll::Pending,
293            }
294        }
295        if *this.written > 0 {
296            this.buf.drain(..*this.written);
297        }
298        *this.written = 0;
299        Poll::Ready(ret)
300    }
301}
302
303impl<W: Write> Write for BufWriter<W> {
304    fn poll_write(
305        mut self: Pin<&mut Self>,
306        cx: &mut Context<'_>,
307        buf: &[u8],
308    ) -> Poll<io::Result<usize>> {
309        if self.buf.len() + buf.len() > self.buf.capacity() {
310            ready!(self.as_mut().poll_flush_buf(cx))?;
311        }
312        if buf.len() >= self.buf.capacity() {
313            self.get_pin_mut().poll_write(cx, buf)
314        } else {
315            Pin::new(&mut *self.project().buf).poll_write(cx, buf)
316        }
317    }
318
319    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
320        ready!(self.as_mut().poll_flush_buf(cx))?;
321        self.get_pin_mut().poll_flush(cx)
322    }
323
324    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
325        ready!(self.as_mut().poll_flush_buf(cx))?;
326        self.get_pin_mut().poll_close(cx)
327    }
328}
329
330impl<W: Write + fmt::Debug> fmt::Debug for BufWriter<W> {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        f.debug_struct("BufWriter")
333            .field("writer", &self.inner)
334            .field("buf", &self.buf)
335            .finish()
336    }
337}
338
339impl<W: Write + Seek> Seek for BufWriter<W> {
340    /// Seek to the offset, in bytes, in the underlying writer.
341    ///
342    /// Seeking always writes out the internal buffer before seeking.
343    fn poll_seek(
344        mut self: Pin<&mut Self>,
345        cx: &mut Context<'_>,
346        pos: SeekFrom,
347    ) -> Poll<io::Result<u64>> {
348        ready!(self.as_mut().poll_flush_buf(cx))?;
349        self.get_pin_mut().poll_seek(cx, pos)
350    }
351}