tokio_io/io/
copy.rs

1use std::io;
2
3use futures::{Future, Poll};
4
5use {AsyncRead, AsyncWrite};
6
7/// A future which will copy all data from a reader into a writer.
8///
9/// Created by the [`copy`] function, this future will resolve to the number of
10/// bytes copied or an error if one happens.
11///
12/// [`copy`]: fn.copy.html
13#[derive(Debug)]
14pub struct Copy<R, W> {
15    reader: Option<R>,
16    read_done: bool,
17    writer: Option<W>,
18    pos: usize,
19    cap: usize,
20    amt: u64,
21    buf: Box<[u8]>,
22}
23
24/// Creates a future which represents copying all the bytes from one object to
25/// another.
26///
27/// The returned future will copy all the bytes read from `reader` into the
28/// `writer` specified. This future will only complete once the `reader` has hit
29/// EOF and all bytes have been written to and flushed from the `writer`
30/// provided.
31///
32/// On success the number of bytes is returned and the `reader` and `writer` are
33/// consumed. On error the error is returned and the I/O objects are consumed as
34/// well.
35pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
36where
37    R: AsyncRead,
38    W: AsyncWrite,
39{
40    Copy {
41        reader: Some(reader),
42        read_done: false,
43        writer: Some(writer),
44        amt: 0,
45        pos: 0,
46        cap: 0,
47        buf: Box::new([0; 2048]),
48    }
49}
50
51impl<R, W> Future for Copy<R, W>
52where
53    R: AsyncRead,
54    W: AsyncWrite,
55{
56    type Item = (u64, R, W);
57    type Error = io::Error;
58
59    fn poll(&mut self) -> Poll<(u64, R, W), io::Error> {
60        loop {
61            // If our buffer is empty, then we need to read some data to
62            // continue.
63            if self.pos == self.cap && !self.read_done {
64                let reader = self.reader.as_mut().unwrap();
65                let n = try_ready!(reader.poll_read(&mut self.buf));
66                if n == 0 {
67                    self.read_done = true;
68                } else {
69                    self.pos = 0;
70                    self.cap = n;
71                }
72            }
73
74            // If our buffer has some data, let's write it out!
75            while self.pos < self.cap {
76                let writer = self.writer.as_mut().unwrap();
77                let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap]));
78                if i == 0 {
79                    return Err(io::Error::new(
80                        io::ErrorKind::WriteZero,
81                        "write zero byte into writer",
82                    ));
83                } else {
84                    self.pos += i;
85                    self.amt += i as u64;
86                }
87            }
88
89            // If we've written al the data and we've seen EOF, flush out the
90            // data and finish the transfer.
91            // done with the entire transfer.
92            if self.pos == self.cap && self.read_done {
93                try_ready!(self.writer.as_mut().unwrap().poll_flush());
94                let reader = self.reader.take().unwrap();
95                let writer = self.writer.take().unwrap();
96                return Ok((self.amt, reader, writer).into());
97            }
98        }
99    }
100}