tokio_util/io/
reader_stream.rs

1use bytes::{Bytes, BytesMut};
2use futures_core::stream::Stream;
3use pin_project_lite::pin_project;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::io::AsyncRead;
7
8const DEFAULT_CAPACITY: usize = 4096;
9
10pin_project! {
11    /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
12    ///
13    /// This stream is fused. It performs the inverse operation of
14    /// [`StreamReader`].
15    ///
16    /// # Example
17    ///
18    /// ```
19    /// # #[tokio::main]
20    /// # async fn main() -> std::io::Result<()> {
21    /// use tokio_stream::StreamExt;
22    /// use tokio_util::io::ReaderStream;
23    ///
24    /// // Create a stream of data.
25    /// let data = b"hello, world!";
26    /// let mut stream = ReaderStream::new(&data[..]);
27    ///
28    /// // Read all of the chunks into a vector.
29    /// let mut stream_contents = Vec::new();
30    /// while let Some(chunk) = stream.next().await {
31    ///    stream_contents.extend_from_slice(&chunk?);
32    /// }
33    ///
34    /// // Once the chunks are concatenated, we should have the
35    /// // original data.
36    /// assert_eq!(stream_contents, data);
37    /// # Ok(())
38    /// # }
39    /// ```
40    ///
41    /// [`AsyncRead`]: tokio::io::AsyncRead
42    /// [`StreamReader`]: crate::io::StreamReader
43    /// [`Stream`]: futures_core::Stream
44    #[derive(Debug)]
45    pub struct ReaderStream<R> {
46        // Reader itself.
47        //
48        // This value is `None` if the stream has terminated.
49        #[pin]
50        reader: Option<R>,
51        // Working buffer, used to optimize allocations.
52        buf: BytesMut,
53        capacity: usize,
54    }
55}
56
57impl<R: AsyncRead> ReaderStream<R> {
58    /// Convert an [`AsyncRead`] into a [`Stream`] with item type
59    /// `Result<Bytes, std::io::Error>`.
60    ///
61    /// [`AsyncRead`]: tokio::io::AsyncRead
62    /// [`Stream`]: futures_core::Stream
63    pub fn new(reader: R) -> Self {
64        ReaderStream {
65            reader: Some(reader),
66            buf: BytesMut::new(),
67            capacity: DEFAULT_CAPACITY,
68        }
69    }
70
71    /// Convert an [`AsyncRead`] into a [`Stream`] with item type
72    /// `Result<Bytes, std::io::Error>`,
73    /// with a specific read buffer initial capacity.
74    ///
75    /// [`AsyncRead`]: tokio::io::AsyncRead
76    /// [`Stream`]: futures_core::Stream
77    pub fn with_capacity(reader: R, capacity: usize) -> Self {
78        ReaderStream {
79            reader: Some(reader),
80            buf: BytesMut::with_capacity(capacity),
81            capacity,
82        }
83    }
84}
85
86impl<R: AsyncRead> Stream for ReaderStream<R> {
87    type Item = std::io::Result<Bytes>;
88    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89        use crate::util::poll_read_buf;
90
91        let mut this = self.as_mut().project();
92
93        let reader = match this.reader.as_pin_mut() {
94            Some(r) => r,
95            None => return Poll::Ready(None),
96        };
97
98        if this.buf.capacity() == 0 {
99            this.buf.reserve(*this.capacity);
100        }
101
102        match poll_read_buf(reader, cx, &mut this.buf) {
103            Poll::Pending => Poll::Pending,
104            Poll::Ready(Err(err)) => {
105                self.project().reader.set(None);
106                Poll::Ready(Some(Err(err)))
107            }
108            Poll::Ready(Ok(0)) => {
109                self.project().reader.set(None);
110                Poll::Ready(None)
111            }
112            Poll::Ready(Ok(_)) => {
113                let chunk = this.buf.split();
114                Poll::Ready(Some(Ok(chunk.freeze())))
115            }
116        }
117    }
118}