async_std/stream/stream/
chain.rs1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use super::fuse::Fuse;
6use crate::stream::stream::StreamExt;
7use crate::stream::Stream;
8use crate::task::{Context, Poll};
9
10pin_project! {
11 #[derive(Debug)]
19 pub struct Chain<S, U> {
20 #[pin]
21 first: Fuse<S>,
22 #[pin]
23 second: Fuse<U>,
24 }
25}
26
27impl<S: Stream, U: Stream> Chain<S, U> {
28 pub(super) fn new(first: S, second: U) -> Self {
29 Self {
30 first: first.fuse(),
31 second: second.fuse(),
32 }
33 }
34}
35
36impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
37 type Item = S::Item;
38
39 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40 let mut this = self.project();
41 if !this.first.done {
42 let next = futures_core::ready!(this.first.as_mut().poll_next(cx));
43 if let Some(next) = next {
44 return Poll::Ready(Some(next));
45 }
46 }
47
48 if !this.second.done {
49 let next = futures_core::ready!(this.second.as_mut().poll_next(cx));
50 if let Some(next) = next {
51 return Poll::Ready(Some(next));
52 }
53 }
54
55 if this.first.done && this.second.done {
56 return Poll::Ready(None);
57 }
58
59 Poll::Pending
60 }
61}