futures_util/stream/
or_else.rs1use futures_core::{IntoFuture, Future, Poll, Async, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct OrElse<S, U, F>
12 where U: IntoFuture,
13{
14 stream: S,
15 future: Option<U::Future>,
16 f: F,
17}
18
19pub fn new<S, U, F>(s: S, f: F) -> OrElse<S, U, F>
20 where S: Stream,
21 F: FnMut(S::Error) -> U,
22 U: IntoFuture<Item=S::Item>,
23{
24 OrElse {
25 stream: s,
26 future: None,
27 f: f,
28 }
29}
30
31impl<S, U, F> Sink for OrElse<S, U, F>
33 where S: Sink, U: IntoFuture
34{
35 type SinkItem = S::SinkItem;
36 type SinkError = S::SinkError;
37
38 delegate_sink!(stream);
39}
40
41impl<S, U, F> Stream for OrElse<S, U, F>
42 where S: Stream,
43 F: FnMut(S::Error) -> U,
44 U: IntoFuture<Item=S::Item>,
45{
46 type Item = S::Item;
47 type Error = U::Error;
48
49 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, U::Error> {
50 if self.future.is_none() {
51 let item = match self.stream.poll_next(cx) {
52 Ok(Async::Ready(e)) => return Ok(Async::Ready(e)),
53 Ok(Async::Pending) => return Ok(Async::Pending),
54 Err(e) => e,
55 };
56 self.future = Some((self.f)(item).into_future());
57 }
58 assert!(self.future.is_some());
59 match self.future.as_mut().unwrap().poll(cx) {
60 Ok(Async::Ready(e)) => {
61 self.future = None;
62 Ok(Async::Ready(Some(e)))
63 }
64 Err(e) => {
65 self.future = None;
66 Err(e)
67 }
68 Ok(Async::Pending) => Ok(Async::Pending)
69 }
70 }
71}