async_std/io/read/
chain.rs1use std::fmt;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::io::{self, BufRead, IoSliceMut, Read};
7use crate::task::{Context, Poll};
8
9pin_project! {
10 pub struct Chain<T, U> {
17 #[pin]
18 pub(crate) first: T,
19 #[pin]
20 pub(crate) second: U,
21 pub(crate) done_first: bool,
22 }
23}
24
25impl<T, U> Chain<T, U> {
26 pub fn into_inner(self) -> (T, U) {
45 (self.first, self.second)
46 }
47
48 pub fn get_ref(&self) -> (&T, &U) {
67 (&self.first, &self.second)
68 }
69
70 pub fn get_mut(&mut self) -> (&mut T, &mut U) {
93 (&mut self.first, &mut self.second)
94 }
95}
96
97impl<T: fmt::Debug, U: fmt::Debug> fmt::Debug for Chain<T, U> {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 f.debug_struct("Chain")
100 .field("t", &self.first)
101 .field("u", &self.second)
102 .finish()
103 }
104}
105
106impl<T: Read, U: Read> Read for Chain<T, U> {
107 fn poll_read(
108 self: Pin<&mut Self>,
109 cx: &mut Context<'_>,
110 buf: &mut [u8],
111 ) -> Poll<io::Result<usize>> {
112 let this = self.project();
113 if !*this.done_first {
114 match futures_core::ready!(this.first.poll_read(cx, buf)) {
115 Ok(0) if !buf.is_empty() => *this.done_first = true,
116 Ok(n) => return Poll::Ready(Ok(n)),
117 Err(err) => return Poll::Ready(Err(err)),
118 }
119 }
120
121 this.second.poll_read(cx, buf)
122 }
123
124 fn poll_read_vectored(
125 self: Pin<&mut Self>,
126 cx: &mut Context<'_>,
127 bufs: &mut [IoSliceMut<'_>],
128 ) -> Poll<io::Result<usize>> {
129 let this = self.project();
130 if !*this.done_first {
131 match futures_core::ready!(this.first.poll_read_vectored(cx, bufs)) {
132 Ok(0) if !bufs.is_empty() => *this.done_first = true,
133 Ok(n) => return Poll::Ready(Ok(n)),
134 Err(err) => return Poll::Ready(Err(err)),
135 }
136 }
137
138 this.second.poll_read_vectored(cx, bufs)
139 }
140}
141
142impl<T: BufRead, U: BufRead> BufRead for Chain<T, U> {
143 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
144 let this = self.project();
145 if !*this.done_first {
146 match futures_core::ready!(this.first.poll_fill_buf(cx)) {
147 Ok(buf) if buf.is_empty() => {
148 *this.done_first = true;
149 }
150 Ok(buf) => return Poll::Ready(Ok(buf)),
151 Err(err) => return Poll::Ready(Err(err)),
152 }
153 }
154
155 this.second.poll_fill_buf(cx)
156 }
157
158 fn consume(self: Pin<&mut Self>, amt: usize) {
159 let this = self.project();
160 if !*this.done_first {
161 this.first.consume(amt)
162 } else {
163 this.second.consume(amt)
164 }
165 }
166}
167
168#[cfg(all(test, feature = "default", not(target_arch = "wasm32")))]
169mod tests {
170 use crate::io;
171 use crate::prelude::*;
172 use crate::task;
173
174 #[test]
175 fn test_chain_basics() -> std::io::Result<()> {
176 let source1: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2]);
177 let source2: io::Cursor<Vec<u8>> = io::Cursor::new(vec![3, 4, 5]);
178
179 task::block_on(async move {
180 let mut buffer = Vec::new();
181
182 let mut source = source1.chain(source2);
183
184 assert_eq!(6, source.read_to_end(&mut buffer).await?);
185 assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]);
186
187 Ok(())
188 })
189 }
190}