futures_util/future/
flatten_sink.rs

1use core::fmt;
2
3use futures_core::{task, Async, Future};
4use futures_sink::Sink;
5
6#[derive(Debug)]
7enum State<F> where F: Future, <F as Future>::Item: Sink {
8    Waiting(F),
9    Ready(F::Item),
10    Closed,
11}
12
13/// Future for the `flatten_sink` combinator, flattening a
14/// future-of-a-sink to get just the result of the final sink as a sink.
15///
16/// This is created by the `Future::flatten_sink` method.
17pub struct FlattenSink<F> where F: Future, <F as Future>::Item: Sink {
18    st: State<F>
19}
20
21impl<F> fmt::Debug for FlattenSink<F>
22    where F: Future + fmt::Debug,
23          <F as Future>::Item: Sink<SinkError=F::Error> + fmt::Debug,
24{
25    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
26        fmt.debug_struct("FlattenStream")
27            .field("state", &self.st)
28            .finish()
29    }
30}
31
32impl<F> Sink for FlattenSink<F> where F: Future, <F as Future>::Item: Sink<SinkError=F::Error> {
33    type SinkItem = <<F as Future>::Item as Sink>::SinkItem;
34    type SinkError = <<F as Future>::Item as Sink>::SinkError;
35
36    fn poll_ready(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
37        let mut resolved_stream = match self.st {
38            State::Ready(ref mut s) => return s.poll_ready(cx),
39            State::Waiting(ref mut f) => match f.poll(cx)? {
40                Async::Pending => return Ok(Async::Pending),
41                Async::Ready(s) => s,
42            },
43            State::Closed => panic!("poll_ready called after eof"),
44        };
45        let result = resolved_stream.poll_ready(cx);
46        self.st = State::Ready(resolved_stream);
47        result
48    }
49
50    fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
51        match self.st {
52            State::Ready(ref mut s) => s.start_send(item),
53            State::Waiting(_) => panic!("poll_ready not called first"),
54            State::Closed => panic!("start_send called after eof"),
55        }
56    }
57
58    fn poll_flush(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
59        match self.st {
60            State::Ready(ref mut s) => s.poll_flush(cx),
61            // if sink not yet resolved, nothing written ==> everything flushed
62            State::Waiting(_) => Ok(Async::Ready(())),
63            State::Closed => panic!("poll_flush called after eof"),
64        }
65    }
66
67    fn poll_close(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
68        if let State::Ready(ref mut s) = self.st {
69            try_ready!(s.poll_close(cx));
70        }
71        self.st = State::Closed;
72        return Ok(Async::Ready(()));
73    }
74}
75
76pub fn new<F>(fut: F) -> FlattenSink<F> where F: Future, <F as Future>::Item: Sink {
77    FlattenSink {
78        st: State::Waiting(fut)
79    }
80}