futures_util/stream/
or_else.rs

1use futures_core::{IntoFuture, Future, Poll, Async, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5/// A stream combinator which chains a computation onto errors produced by a
6/// stream.
7///
8/// This structure is produced by the `Stream::or_else` method.
9#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct OrElse<S, U, F>
12    where U: IntoFuture,
13{
14    stream: S,
15    future: Option<U::Future>,
16    f: F,
17}
18
19pub fn new<S, U, F>(s: S, f: F) -> OrElse<S, U, F>
20    where S: Stream,
21          F: FnMut(S::Error) -> U,
22          U: IntoFuture<Item=S::Item>,
23{
24    OrElse {
25        stream: s,
26        future: None,
27        f: f,
28    }
29}
30
31// Forwarding impl of Sink from the underlying stream
32impl<S, U, F> Sink for OrElse<S, U, F>
33    where S: Sink, U: IntoFuture
34{
35    type SinkItem = S::SinkItem;
36    type SinkError = S::SinkError;
37
38    delegate_sink!(stream);
39}
40
41impl<S, U, F> Stream for OrElse<S, U, F>
42    where S: Stream,
43          F: FnMut(S::Error) -> U,
44          U: IntoFuture<Item=S::Item>,
45{
46    type Item = S::Item;
47    type Error = U::Error;
48
49    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, U::Error> {
50        if self.future.is_none() {
51            let item = match self.stream.poll_next(cx) {
52                Ok(Async::Ready(e)) => return Ok(Async::Ready(e)),
53                Ok(Async::Pending) => return Ok(Async::Pending),
54                Err(e) => e,
55            };
56            self.future = Some((self.f)(item).into_future());
57        }
58        assert!(self.future.is_some());
59        match self.future.as_mut().unwrap().poll(cx) {
60            Ok(Async::Ready(e)) => {
61                self.future = None;
62                Ok(Async::Ready(Some(e)))
63            }
64            Err(e) => {
65                self.future = None;
66                Err(e)
67            }
68            Ok(Async::Pending) => Ok(Async::Pending)
69        }
70    }
71}