futures_util/future/
flatten_sink.rs1use core::fmt;
2
3use futures_core::{task, Async, Future};
4use futures_sink::Sink;
5
6#[derive(Debug)]
7enum State<F> where F: Future, <F as Future>::Item: Sink {
8 Waiting(F),
9 Ready(F::Item),
10 Closed,
11}
12
13pub struct FlattenSink<F> where F: Future, <F as Future>::Item: Sink {
18 st: State<F>
19}
20
21impl<F> fmt::Debug for FlattenSink<F>
22 where F: Future + fmt::Debug,
23 <F as Future>::Item: Sink<SinkError=F::Error> + fmt::Debug,
24{
25 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
26 fmt.debug_struct("FlattenStream")
27 .field("state", &self.st)
28 .finish()
29 }
30}
31
32impl<F> Sink for FlattenSink<F> where F: Future, <F as Future>::Item: Sink<SinkError=F::Error> {
33 type SinkItem = <<F as Future>::Item as Sink>::SinkItem;
34 type SinkError = <<F as Future>::Item as Sink>::SinkError;
35
36 fn poll_ready(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
37 let mut resolved_stream = match self.st {
38 State::Ready(ref mut s) => return s.poll_ready(cx),
39 State::Waiting(ref mut f) => match f.poll(cx)? {
40 Async::Pending => return Ok(Async::Pending),
41 Async::Ready(s) => s,
42 },
43 State::Closed => panic!("poll_ready called after eof"),
44 };
45 let result = resolved_stream.poll_ready(cx);
46 self.st = State::Ready(resolved_stream);
47 result
48 }
49
50 fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
51 match self.st {
52 State::Ready(ref mut s) => s.start_send(item),
53 State::Waiting(_) => panic!("poll_ready not called first"),
54 State::Closed => panic!("start_send called after eof"),
55 }
56 }
57
58 fn poll_flush(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
59 match self.st {
60 State::Ready(ref mut s) => s.poll_flush(cx),
61 State::Waiting(_) => Ok(Async::Ready(())),
63 State::Closed => panic!("poll_flush called after eof"),
64 }
65 }
66
67 fn poll_close(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
68 if let State::Ready(ref mut s) = self.st {
69 try_ready!(s.poll_close(cx));
70 }
71 self.st = State::Closed;
72 return Ok(Async::Ready(()));
73 }
74}
75
76pub fn new<F>(fut: F) -> FlattenSink<F> where F: Future, <F as Future>::Item: Sink {
77 FlattenSink {
78 st: State::Waiting(fut)
79 }
80}