1use std::future::ready;
2
3use compio_buf::{BufResult, IntoInner, IoBuf, IoVectoredBuf, buf_try};
4
5use crate::{
6 AsyncWrite, IoResult,
7 buffer::Buffer,
8 util::{DEFAULT_BUF_SIZE, slice_to_buf},
9};
10
11#[derive(Debug)]
30pub struct BufWriter<W> {
31 writer: W,
32 buf: Buffer,
33}
34
35impl<W> BufWriter<W> {
36 pub fn new(writer: W) -> Self {
39 Self::with_capacity(DEFAULT_BUF_SIZE, writer)
40 }
41
42 pub fn with_capacity(cap: usize, writer: W) -> Self {
44 Self {
45 writer,
46 buf: Buffer::with_capacity(cap),
47 }
48 }
49}
50
51impl<W: AsyncWrite> BufWriter<W> {
52 async fn flush_if_needed(&mut self) -> IoResult<()> {
53 if self.buf.need_flush() {
54 self.flush().await?;
55 }
56 Ok(())
57 }
58}
59
60impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
61 async fn write<T: IoBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
62 (_, buf) = buf_try!(self.flush_if_needed().await, buf);
65
66 let written = self
67 .buf
68 .with_sync(|w| {
69 let len = w.buf_len();
70 let mut w = w.slice(len..);
71 let written = slice_to_buf(buf.as_slice(), &mut w);
72 BufResult(Ok(written), w.into_inner())
73 })
74 .expect("Closure always return Ok");
75
76 (_, buf) = buf_try!(self.flush_if_needed().await, buf);
77
78 BufResult(Ok(written), buf)
79 }
80
81 async fn write_vectored<T: IoVectoredBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
82 (_, buf) = buf_try!(self.flush_if_needed().await, buf);
83
84 let written = self
85 .buf
86 .with(|mut w| {
87 let mut written = 0;
88 for buf in buf.iter_buf() {
89 let len = w.buf_len();
90 let mut slice = w.slice(len..);
91 written += slice_to_buf(buf.as_slice(), &mut slice);
92 w = slice.into_inner();
93
94 if w.buf_len() == w.buf_capacity() {
95 break;
96 }
97 }
98 ready(BufResult(Ok(written), w))
99 })
100 .await
101 .expect("Closure always return Ok");
102
103 (_, buf) = buf_try!(self.flush_if_needed().await, buf);
104
105 BufResult(Ok(written), buf)
106 }
107
108 async fn flush(&mut self) -> IoResult<()> {
109 let Self { writer, buf } = self;
110
111 buf.flush_to(writer).await?;
112
113 Ok(())
114 }
115
116 async fn shutdown(&mut self) -> IoResult<()> {
117 self.flush().await?;
118 self.writer.shutdown().await
119 }
120}
121
122impl<W> IntoInner for BufWriter<W> {
123 type Inner = W;
124
125 fn into_inner(self) -> Self::Inner {
126 self.writer
127 }
128}