futures_util/sink/
with.rs1use core::mem;
2use core::marker::PhantomData;
3
4use futures_core::{IntoFuture, Future, Poll, Async, Stream};
5use futures_core::task;
6use futures_sink::{Sink};
7
8#[derive(Clone, Debug)]
11#[must_use = "sinks do nothing unless polled"]
12pub struct With<S, U, Fut, F>
13 where S: Sink,
14 F: FnMut(U) -> Fut,
15 Fut: IntoFuture,
16{
17 sink: S,
18 f: F,
19 state: State<Fut::Future, S::SinkItem>,
20 _phantom: PhantomData<fn(U)>,
21}
22
23#[derive(Clone, Debug)]
24enum State<Fut, T> {
25 Empty,
26 Process(Fut),
27 Buffered(T),
28}
29
30impl<Fut, T> State<Fut, T> {
31 fn is_empty(&self) -> bool {
32 if let State::Empty = *self {
33 true
34 } else {
35 false
36 }
37 }
38}
39
40pub fn new<S, U, Fut, F>(sink: S, f: F) -> With<S, U, Fut, F>
41 where S: Sink,
42 F: FnMut(U) -> Fut,
43 Fut: IntoFuture<Item = S::SinkItem>,
44 Fut::Error: From<S::SinkError>,
45{
46 With {
47 state: State::Empty,
48 sink: sink,
49 f: f,
50 _phantom: PhantomData,
51 }
52}
53
54impl<S, U, Fut, F> Stream for With<S, U, Fut, F>
56 where S: Stream + Sink,
57 F: FnMut(U) -> Fut,
58 Fut: IntoFuture
59{
60 type Item = S::Item;
61 type Error = S::Error;
62
63 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
64 self.sink.poll_next(cx)
65 }
66}
67
68impl<S, U, Fut, F> With<S, U, Fut, F>
69 where S: Sink,
70 F: FnMut(U) -> Fut,
71 Fut: IntoFuture<Item = S::SinkItem>,
72 Fut::Error: From<S::SinkError>,
73{
74 pub fn get_ref(&self) -> &S {
76 &self.sink
77 }
78
79 pub fn get_mut(&mut self) -> &mut S {
81 &mut self.sink
82 }
83
84 pub fn into_inner(self) -> S {
89 self.sink
90 }
91
92 fn poll(&mut self, cx: &mut task::Context) -> Poll<(), Fut::Error> {
93 loop {
94 match mem::replace(&mut self.state, State::Empty) {
95 State::Empty => break,
96 State::Process(mut fut) => {
97 match fut.poll(cx)? {
98 Async::Ready(item) => {
99 self.state = State::Buffered(item);
100 }
101 Async::Pending => {
102 self.state = State::Process(fut);
103 break
104 }
105 }
106 }
107 State::Buffered(item) => {
108 match self.sink.poll_ready(cx)? {
109 Async::Ready(()) => self.sink.start_send(item)?,
110 Async::Pending => {
111 self.state = State::Buffered(item);
112 break
113 }
114 }
115 }
116 }
117 }
118
119 if self.state.is_empty() {
120 Ok(Async::Ready(()))
121 } else {
122 Ok(Async::Pending)
123 }
124 }
125}
126
127impl<S, U, Fut, F> Sink for With<S, U, Fut, F>
128 where S: Sink,
129 F: FnMut(U) -> Fut,
130 Fut: IntoFuture<Item = S::SinkItem>,
131 Fut::Error: From<S::SinkError>,
132{
133 type SinkItem = U;
134 type SinkError = Fut::Error;
135
136 fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
137 self.poll(cx)
138 }
139
140 fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
141 self.state = State::Process((self.f)(item).into_future());
142 Ok(())
143 }
144
145 fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
146 try_ready!(self.poll(cx));
147 self.sink.poll_flush(cx).map_err(Into::into)
148 }
149
150 fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
151 try_ready!(self.poll(cx));
152 self.sink.poll_close(cx).map_err(Into::into)
153 }
154}