tokio_stream/stream_ext/
merge.rs

1use crate::stream_ext::Fuse;
2use crate::Stream;
3
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream returned by the [`merge`](super::StreamExt::merge) method.
10    pub struct Merge<T, U> {
11        #[pin]
12        a: Fuse<T>,
13        #[pin]
14        b: Fuse<U>,
15        // When `true`, poll `a` first, otherwise, `poll` b`.
16        a_first: bool,
17    }
18}
19
20impl<T, U> Merge<T, U> {
21    pub(super) fn new(a: T, b: U) -> Merge<T, U>
22    where
23        T: Stream,
24        U: Stream,
25    {
26        Merge {
27            a: Fuse::new(a),
28            b: Fuse::new(b),
29            a_first: true,
30        }
31    }
32}
33
34impl<T, U> Stream for Merge<T, U>
35where
36    T: Stream,
37    U: Stream<Item = T::Item>,
38{
39    type Item = T::Item;
40
41    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
42        let me = self.project();
43        let a_first = *me.a_first;
44
45        // Toggle the flag
46        *me.a_first = !a_first;
47
48        if a_first {
49            poll_next(me.a, me.b, cx)
50        } else {
51            poll_next(me.b, me.a, cx)
52        }
53    }
54
55    fn size_hint(&self) -> (usize, Option<usize>) {
56        super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
57    }
58}
59
60fn poll_next<T, U>(
61    first: Pin<&mut T>,
62    second: Pin<&mut U>,
63    cx: &mut Context<'_>,
64) -> Poll<Option<T::Item>>
65where
66    T: Stream,
67    U: Stream<Item = T::Item>,
68{
69    let mut done = true;
70
71    match first.poll_next(cx) {
72        Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
73        Poll::Ready(None) => {}
74        Poll::Pending => done = false,
75    }
76
77    match second.poll_next(cx) {
78        Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
79        Poll::Ready(None) => {}
80        Poll::Pending => done = false,
81    }
82
83    if done {
84        Poll::Ready(None)
85    } else {
86        Poll::Pending
87    }
88}