futures_util/stream/
select.rs

1use futures_core::{Async, Poll, Stream};
2use futures_core::task;
3
4use stream::{StreamExt, Fuse};
5
6/// An adapter for merging the output of two streams.
7///
8/// The merged stream produces items from either of the underlying streams as
9/// they become available, and the streams are polled in a round-robin fashion.
10/// Errors, however, are not merged: you get at most one error at a time.
11#[derive(Debug)]
12#[must_use = "streams do nothing unless polled"]
13pub struct Select<S1, S2> {
14    stream1: Fuse<S1>,
15    stream2: Fuse<S2>,
16    flag: bool,
17}
18
19pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2>
20    where S1: Stream,
21          S2: Stream<Item = S1::Item, Error = S1::Error>
22{
23    Select {
24        stream1: stream1.fuse(),
25        stream2: stream2.fuse(),
26        flag: false,
27    }
28}
29
30impl<S1, S2> Stream for Select<S1, S2>
31    where S1: Stream,
32          S2: Stream<Item = S1::Item, Error = S1::Error>
33{
34    type Item = S1::Item;
35    type Error = S1::Error;
36
37    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S1::Item>, S1::Error> {
38        let (a, b) = if self.flag {
39            (&mut self.stream2 as &mut Stream<Item=_, Error=_>,
40             &mut self.stream1 as &mut Stream<Item=_, Error=_>)
41        } else {
42            (&mut self.stream1 as &mut Stream<Item=_, Error=_>,
43             &mut self.stream2 as &mut Stream<Item=_, Error=_>)
44        };
45        self.flag = !self.flag;
46
47        let a_done = match a.poll_next(cx)? {
48            Async::Ready(Some(item)) => return Ok(Some(item).into()),
49            Async::Ready(None) => true,
50            Async::Pending => false,
51        };
52
53        match b.poll_next(cx)? {
54            Async::Ready(Some(item)) => {
55                // If the other stream isn't finished yet, give them a chance to
56                // go first next time as we pulled something off `b`.
57                if !a_done {
58                    self.flag = !self.flag;
59                }
60                Ok(Some(item).into())
61            }
62            Async::Ready(None) if a_done => Ok(None.into()),
63            Async::Ready(None) | Async::Pending => Ok(Async::Pending),
64        }
65    }
66}