futures_util/io/
write_all.rs

1use std::io;
2use std::mem;
3
4use {Poll, Future, task};
5
6use futures_io::AsyncWrite;
7
8/// A future used to write the entire contents of some data to a stream.
9///
10/// This is created by the [`write_all`] top-level method.
11///
12/// [`write_all`]: fn.write_all.html
13#[derive(Debug)]
14pub struct WriteAll<A, T> {
15    state: State<A, T>,
16}
17
18#[derive(Debug)]
19enum State<A, T> {
20    Writing {
21        a: A,
22        buf: T,
23        pos: usize,
24    },
25    Empty,
26}
27
28pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
29    where A: AsyncWrite,
30          T: AsRef<[u8]>,
31{
32    WriteAll {
33        state: State::Writing {
34            a: a,
35            buf: buf,
36            pos: 0,
37        },
38    }
39}
40
41fn zero_write() -> io::Error {
42    io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
43}
44
45impl<A, T> Future for WriteAll<A, T>
46    where A: AsyncWrite,
47          T: AsRef<[u8]>,
48{
49    type Item = (A, T);
50    type Error = io::Error;
51
52    fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, T), io::Error> {
53        match self.state {
54            State::Writing { ref mut a, ref buf, ref mut pos } => {
55                let buf = buf.as_ref();
56                while *pos < buf.len() {
57                    let n = try_ready!(a.poll_write(cx, &buf[*pos..]));
58                    *pos += n;
59                    if n == 0 {
60                        return Err(zero_write())
61                    }
62                }
63            }
64            State::Empty => panic!("poll a WriteAll after it's done"),
65        }
66
67        match mem::replace(&mut self.state, State::Empty) {
68            State::Writing { a, buf, .. } => Ok((a, buf).into()),
69            State::Empty => panic!(),
70        }
71    }
72}