1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
use std::future::ready;
use compio_buf::{buf_try, BufResult, IntoInner, IoBuf};
use crate::{
buffer::Buffer,
util::{slice_to_buf, DEFAULT_BUF_SIZE},
AsyncWrite, AsyncWriteExt, IoResult,
};
/// Wraps a writer and buffers its output.
///
/// It can be excessively inefficient to work directly with something that
/// implements [`AsyncWrite`]. A `BufWriter<W>` keeps an in-memory buffer of
/// data and writes it to an underlying writer in large, infrequent batches.
//
/// `BufWriter<W>` can improve the speed of programs that make *small* and
/// *repeated* write calls to the same file or network socket. It does not
/// help when writing very large amounts at once, or writing just one or a few
/// times. It also provides no advantage when writing to a destination that is
/// in memory, like a `Vec<u8>`.
///
/// Dropping `BufWriter<W>` also discards any bytes left in the buffer, so it is
/// critical to call [`flush`] before `BufWriter<W>` is dropped. Calling
/// [`flush`] ensures that the buffer is empty and thus no data is lost.
///
/// [`flush`]: AsyncWrite::flush
#[derive(Debug)]
pub struct BufWriter<W> {
writer: W,
buf: Buffer,
}
impl<W> BufWriter<W> {
/// Creates a new `BufWriter` with a default buffer capacity. The default is
/// currently 8 KB, but may change in the future.
pub fn new(writer: W) -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE, writer)
}
/// Creates a new `BufWriter` with the specified buffer capacity.
pub fn with_capacity(cap: usize, writer: W) -> Self {
Self {
writer,
buf: Buffer::with_capacity(cap),
}
}
}
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
async fn write<T: IoBuf>(&mut self, mut buf: T) -> compio_buf::BufResult<usize, T> {
let written = self
.buf
.with_sync(|w| {
let len = w.buf_len();
let mut w = w.slice(len..);
let written = slice_to_buf(buf.as_slice(), &mut w);
BufResult(Ok(written), w.into_inner())
})
.expect("Closure always return Ok");
if self.buf.need_flush() {
(_, buf) = buf_try!(self.flush().await, buf);
}
BufResult(Ok(written), buf)
}
async fn write_vectored<T: compio_buf::IoVectoredBuf>(
&mut self,
mut buf: T,
) -> compio_buf::BufResult<usize, T> {
let written = self
.buf
.with(|mut w| {
let mut written = 0;
for buf in buf.as_dyn_bufs() {
let len = w.buf_len();
let mut slice = w.slice(len..);
written += slice_to_buf(buf.as_slice(), &mut slice);
w = slice.into_inner();
if w.buf_len() == w.buf_capacity() {
break;
}
}
ready(BufResult(Ok(written), w))
})
.await
.expect("Closure always return Ok");
if self.buf.need_flush() {
(_, buf) = buf_try!(self.flush().await, buf);
}
BufResult(Ok(written), buf)
}
async fn flush(&mut self) -> IoResult<()> {
let Self { writer, buf } = self;
buf.with(|w| writer.write_all(w)).await?;
buf.reset();
Ok(())
}
async fn shutdown(&mut self) -> IoResult<()> {
self.flush().await?;
self.writer.shutdown().await
}
}
impl<W> IntoInner for BufWriter<W> {
type Inner = W;
fn into_inner(self) -> Self::Inner {
self.writer
}
}