async_std/io/read/
chain.rs

1use std::fmt;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::io::{self, BufRead, IoSliceMut, Read};
7use crate::task::{Context, Poll};
8
9pin_project! {
10    /// Adaptor to chain together two readers.
11    ///
12    /// This struct is generally created by calling [`chain`] on a reader.
13    /// Please see the documentation of [`chain`] for more details.
14    ///
15    /// [`chain`]: trait.Read.html#method.chain
16    pub struct Chain<T, U> {
17        #[pin]
18        pub(crate) first: T,
19        #[pin]
20        pub(crate) second: U,
21        pub(crate) done_first: bool,
22    }
23}
24
25impl<T, U> Chain<T, U> {
26    /// Consumes the `Chain`, returning the wrapped readers.
27    ///
28    /// # Examples
29    ///
30    /// ```no_run
31    /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
32    /// #
33    /// use async_std::prelude::*;
34    /// use async_std::fs::File;
35    ///
36    /// let foo_file = File::open("foo.txt").await?;
37    /// let bar_file = File::open("bar.txt").await?;
38    ///
39    /// let chain = foo_file.chain(bar_file);
40    /// let (foo_file, bar_file) = chain.into_inner();
41    /// #
42    /// # Ok(()) }) }
43    /// ```
44    pub fn into_inner(self) -> (T, U) {
45        (self.first, self.second)
46    }
47
48    /// Gets references to the underlying readers in this `Chain`.
49    ///
50    /// # Examples
51    ///
52    /// ```no_run
53    /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
54    /// #
55    /// use async_std::prelude::*;
56    /// use async_std::fs::File;
57    ///
58    /// let foo_file = File::open("foo.txt").await?;
59    /// let bar_file = File::open("bar.txt").await?;
60    ///
61    /// let chain = foo_file.chain(bar_file);
62    /// let (foo_file, bar_file) = chain.get_ref();
63    /// #
64    /// # Ok(()) }) }
65    /// ```
66    pub fn get_ref(&self) -> (&T, &U) {
67        (&self.first, &self.second)
68    }
69
70    /// Gets mutable references to the underlying readers in this `Chain`.
71    ///
72    /// Care should be taken to avoid modifying the internal I/O state of the
73    /// underlying readers as doing so may corrupt the internal state of this
74    /// `Chain`.
75    ///
76    /// # Examples
77    ///
78    /// ```no_run
79    /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
80    /// #
81    /// use async_std::prelude::*;
82    /// use async_std::fs::File;
83    ///
84    /// let foo_file = File::open("foo.txt").await?;
85    /// let bar_file = File::open("bar.txt").await?;
86    ///
87    /// let mut chain = foo_file.chain(bar_file);
88    /// let (foo_file, bar_file) = chain.get_mut();
89    /// #
90    /// # Ok(()) }) }
91    /// ```
92    pub fn get_mut(&mut self) -> (&mut T, &mut U) {
93        (&mut self.first, &mut self.second)
94    }
95}
96
97impl<T: fmt::Debug, U: fmt::Debug> fmt::Debug for Chain<T, U> {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        f.debug_struct("Chain")
100            .field("t", &self.first)
101            .field("u", &self.second)
102            .finish()
103    }
104}
105
106impl<T: Read, U: Read> Read for Chain<T, U> {
107    fn poll_read(
108        self: Pin<&mut Self>,
109        cx: &mut Context<'_>,
110        buf: &mut [u8],
111    ) -> Poll<io::Result<usize>> {
112        let this = self.project();
113        if !*this.done_first {
114            match futures_core::ready!(this.first.poll_read(cx, buf)) {
115                Ok(0) if !buf.is_empty() => *this.done_first = true,
116                Ok(n) => return Poll::Ready(Ok(n)),
117                Err(err) => return Poll::Ready(Err(err)),
118            }
119        }
120
121        this.second.poll_read(cx, buf)
122    }
123
124    fn poll_read_vectored(
125        self: Pin<&mut Self>,
126        cx: &mut Context<'_>,
127        bufs: &mut [IoSliceMut<'_>],
128    ) -> Poll<io::Result<usize>> {
129        let this = self.project();
130        if !*this.done_first {
131            match futures_core::ready!(this.first.poll_read_vectored(cx, bufs)) {
132                Ok(0) if !bufs.is_empty() => *this.done_first = true,
133                Ok(n) => return Poll::Ready(Ok(n)),
134                Err(err) => return Poll::Ready(Err(err)),
135            }
136        }
137
138        this.second.poll_read_vectored(cx, bufs)
139    }
140}
141
142impl<T: BufRead, U: BufRead> BufRead for Chain<T, U> {
143    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
144        let this = self.project();
145        if !*this.done_first {
146            match futures_core::ready!(this.first.poll_fill_buf(cx)) {
147                Ok(buf) if buf.is_empty() => {
148                    *this.done_first = true;
149                }
150                Ok(buf) => return Poll::Ready(Ok(buf)),
151                Err(err) => return Poll::Ready(Err(err)),
152            }
153        }
154
155        this.second.poll_fill_buf(cx)
156    }
157
158    fn consume(self: Pin<&mut Self>, amt: usize) {
159        let this = self.project();
160        if !*this.done_first {
161            this.first.consume(amt)
162        } else {
163            this.second.consume(amt)
164        }
165    }
166}
167
168#[cfg(all(test, feature = "default", not(target_arch = "wasm32")))]
169mod tests {
170    use crate::io;
171    use crate::prelude::*;
172    use crate::task;
173
174    #[test]
175    fn test_chain_basics() -> std::io::Result<()> {
176        let source1: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2]);
177        let source2: io::Cursor<Vec<u8>> = io::Cursor::new(vec![3, 4, 5]);
178
179        task::block_on(async move {
180            let mut buffer = Vec::new();
181
182            let mut source = source1.chain(source2);
183
184            assert_eq!(6, source.read_to_end(&mut buffer).await?);
185            assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]);
186
187            Ok(())
188        })
189    }
190}