futures_util/stream/
select.rs1use futures_core::{Async, Poll, Stream};
2use futures_core::task;
3
4use stream::{StreamExt, Fuse};
5
6#[derive(Debug)]
12#[must_use = "streams do nothing unless polled"]
13pub struct Select<S1, S2> {
14 stream1: Fuse<S1>,
15 stream2: Fuse<S2>,
16 flag: bool,
17}
18
19pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Select<S1, S2>
20 where S1: Stream,
21 S2: Stream<Item = S1::Item, Error = S1::Error>
22{
23 Select {
24 stream1: stream1.fuse(),
25 stream2: stream2.fuse(),
26 flag: false,
27 }
28}
29
30impl<S1, S2> Stream for Select<S1, S2>
31 where S1: Stream,
32 S2: Stream<Item = S1::Item, Error = S1::Error>
33{
34 type Item = S1::Item;
35 type Error = S1::Error;
36
37 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S1::Item>, S1::Error> {
38 let (a, b) = if self.flag {
39 (&mut self.stream2 as &mut Stream<Item=_, Error=_>,
40 &mut self.stream1 as &mut Stream<Item=_, Error=_>)
41 } else {
42 (&mut self.stream1 as &mut Stream<Item=_, Error=_>,
43 &mut self.stream2 as &mut Stream<Item=_, Error=_>)
44 };
45 self.flag = !self.flag;
46
47 let a_done = match a.poll_next(cx)? {
48 Async::Ready(Some(item)) => return Ok(Some(item).into()),
49 Async::Ready(None) => true,
50 Async::Pending => false,
51 };
52
53 match b.poll_next(cx)? {
54 Async::Ready(Some(item)) => {
55 if !a_done {
58 self.flag = !self.flag;
59 }
60 Ok(Some(item).into())
61 }
62 Async::Ready(None) if a_done => Ok(None.into()),
63 Async::Ready(None) | Async::Pending => Ok(Async::Pending),
64 }
65 }
66}