async_std/io/
copy.rs

1use std::future::Future;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::io::{self, BufRead, BufReader, Read, Write};
7use crate::task::{Context, Poll};
8use crate::utils::Context as _;
9
10// Note: There are two otherwise-identical implementations of this
11// function because unstable has removed the `?Sized` bound for the
12// reader and writer and accepts `R` and `W` instead of `&mut R` and
13// `&mut W`. If making a change to either of the implementations,
14// ensure that you copy it into the other.
15
16/// Copies the entire contents of a reader into a writer.
17///
18/// This function will continuously read data from `reader` and then
19/// write it into `writer` in a streaming fashion until `reader`
20/// returns EOF.
21///
22/// On success, the total number of bytes that were copied from
23/// `reader` to `writer` is returned.
24///
25/// If you’re wanting to copy the contents of one file to another and you’re
26/// working with filesystem paths, see the [`fs::copy`] function.
27///
28/// This function is an async version of [`std::io::copy`].
29///
30/// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html
31/// [`fs::copy`]: ../fs/fn.copy.html
32///
33/// # Errors
34///
35/// This function will return an error immediately if any call to `read` or
36/// `write` returns an error. All instances of `ErrorKind::Interrupted` are
37/// handled by this function and the underlying operation is retried.
38///
39/// # Examples
40///
41/// ```
42/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
43/// #
44/// use async_std::io;
45///
46/// let mut reader: &[u8] = b"hello";
47/// let mut writer = io::stdout();
48///
49/// io::copy(&mut reader, &mut writer).await?;
50/// #
51/// # Ok(()) }) }
52/// ```
53#[cfg(any(feature = "docs", not(feature = "unstable")))]
54pub async fn copy<R, W>(reader: &mut R, writer: &mut W) -> io::Result<u64>
55where
56    R: Read + Unpin + ?Sized,
57    W: Write + Unpin + ?Sized,
58{
59    pin_project! {
60        struct CopyFuture<R, W> {
61            #[pin]
62            reader: R,
63            #[pin]
64            writer: W,
65            amt: u64,
66            reader_eof: bool
67        }
68    }
69
70    impl<R, W> Future for CopyFuture<R, W>
71    where
72        R: BufRead,
73        W: Write + Unpin,
74    {
75        type Output = io::Result<u64>;
76
77        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78            let mut this = self.project();
79
80            loop {
81                if *this.reader_eof {
82                    futures_core::ready!(this.writer.as_mut().poll_flush(cx))?;
83                    return Poll::Ready(Ok(*this.amt));
84                }
85
86                let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?;
87
88                if buffer.is_empty() {
89                    *this.reader_eof = true;
90                    continue;
91                }
92
93                let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?;
94                if i == 0 {
95                    return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
96                }
97                *this.amt += i as u64;
98                this.reader.as_mut().consume(i);
99            }
100        }
101    }
102
103    let future = CopyFuture {
104        reader: BufReader::new(reader),
105        writer,
106        reader_eof: false,
107        amt: 0,
108    };
109    future.await.context(|| String::from("io::copy failed"))
110}
111
112/// Copies the entire contents of a reader into a writer.
113///
114/// This function will continuously read data from `reader` and then
115/// write it into `writer` in a streaming fashion until `reader`
116/// returns EOF.
117///
118/// On success, the total number of bytes that were copied from
119/// `reader` to `writer` is returned.
120///
121/// If you’re wanting to copy the contents of one file to another and you’re
122/// working with filesystem paths, see the [`fs::copy`] function.
123///
124/// This function is an async version of [`std::io::copy`].
125///
126/// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html
127/// [`fs::copy`]: ../fs/fn.copy.html
128///
129/// # Errors
130///
131/// This function will return an error immediately if any call to `read` or
132/// `write` returns an error. All instances of `ErrorKind::Interrupted` are
133/// handled by this function and the underlying operation is retried.
134///
135/// # Examples
136///
137/// ```
138/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
139/// #
140/// use async_std::io;
141///
142/// let mut reader: &[u8] = b"hello";
143/// let mut writer = io::stdout();
144///
145/// io::copy(&mut reader, &mut writer).await?;
146/// #
147/// # Ok(()) }) }
148/// ```
149#[cfg(all(feature = "unstable", not(feature = "docs")))]
150pub async fn copy<R, W>(reader: R, writer: W) -> io::Result<u64>
151where
152    R: Read + Unpin,
153    W: Write + Unpin,
154{
155    pin_project! {
156        struct CopyFuture<R, W> {
157            #[pin]
158            reader: R,
159            #[pin]
160            writer: W,
161            amt: u64,
162            reader_eof: bool
163        }
164    }
165
166    impl<R, W> Future for CopyFuture<R, W>
167    where
168        R: BufRead,
169        W: Write + Unpin,
170    {
171        type Output = io::Result<u64>;
172
173        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
174            let mut this = self.project();
175
176            loop {
177                if *this.reader_eof {
178                    futures_core::ready!(this.writer.as_mut().poll_flush(cx))?;
179                    return Poll::Ready(Ok(*this.amt));
180                }
181
182                let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?;
183
184                if buffer.is_empty() {
185                    *this.reader_eof = true;
186                    continue;
187                }
188
189                let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?;
190                if i == 0 {
191                    return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
192                }
193                *this.amt += i as u64;
194                this.reader.as_mut().consume(i);
195            }
196        }
197    }
198
199    let future = CopyFuture {
200        reader: BufReader::new(reader),
201        writer,
202        reader_eof: false,
203        amt: 0,
204    };
205    future.await.context(|| String::from("io::copy failed"))
206}