tokio_io/io/copy.rs
1use std::io;
2
3use futures::{Future, Poll};
4
5use {AsyncRead, AsyncWrite};
6
7/// A future which will copy all data from a reader into a writer.
8///
9/// Created by the [`copy`] function, this future will resolve to the number of
10/// bytes copied or an error if one happens.
11///
12/// [`copy`]: fn.copy.html
13#[derive(Debug)]
14pub struct Copy<R, W> {
15 reader: Option<R>,
16 read_done: bool,
17 writer: Option<W>,
18 pos: usize,
19 cap: usize,
20 amt: u64,
21 buf: Box<[u8]>,
22}
23
24/// Creates a future which represents copying all the bytes from one object to
25/// another.
26///
27/// The returned future will copy all the bytes read from `reader` into the
28/// `writer` specified. This future will only complete once the `reader` has hit
29/// EOF and all bytes have been written to and flushed from the `writer`
30/// provided.
31///
32/// On success the number of bytes is returned and the `reader` and `writer` are
33/// consumed. On error the error is returned and the I/O objects are consumed as
34/// well.
35pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
36where
37 R: AsyncRead,
38 W: AsyncWrite,
39{
40 Copy {
41 reader: Some(reader),
42 read_done: false,
43 writer: Some(writer),
44 amt: 0,
45 pos: 0,
46 cap: 0,
47 buf: Box::new([0; 2048]),
48 }
49}
50
51impl<R, W> Future for Copy<R, W>
52where
53 R: AsyncRead,
54 W: AsyncWrite,
55{
56 type Item = (u64, R, W);
57 type Error = io::Error;
58
59 fn poll(&mut self) -> Poll<(u64, R, W), io::Error> {
60 loop {
61 // If our buffer is empty, then we need to read some data to
62 // continue.
63 if self.pos == self.cap && !self.read_done {
64 let reader = self.reader.as_mut().unwrap();
65 let n = try_ready!(reader.poll_read(&mut self.buf));
66 if n == 0 {
67 self.read_done = true;
68 } else {
69 self.pos = 0;
70 self.cap = n;
71 }
72 }
73
74 // If our buffer has some data, let's write it out!
75 while self.pos < self.cap {
76 let writer = self.writer.as_mut().unwrap();
77 let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap]));
78 if i == 0 {
79 return Err(io::Error::new(
80 io::ErrorKind::WriteZero,
81 "write zero byte into writer",
82 ));
83 } else {
84 self.pos += i;
85 self.amt += i as u64;
86 }
87 }
88
89 // If we've written al the data and we've seen EOF, flush out the
90 // data and finish the transfer.
91 // done with the entire transfer.
92 if self.pos == self.cap && self.read_done {
93 try_ready!(self.writer.as_mut().unwrap().poll_flush());
94 let reader = self.reader.take().unwrap();
95 let writer = self.writer.take().unwrap();
96 return Ok((self.amt, reader, writer).into());
97 }
98 }
99 }
100}