futures_util/sink/
with.rs

1use core::mem;
2use core::marker::PhantomData;
3
4use futures_core::{IntoFuture, Future, Poll, Async, Stream};
5use futures_core::task;
6use futures_sink::{Sink};
7
8/// Sink for the `Sink::with` combinator, chaining a computation to run *prior*
9/// to pushing a value into the underlying sink.
10#[derive(Clone, Debug)]
11#[must_use = "sinks do nothing unless polled"]
12pub struct With<S, U, Fut, F>
13    where S: Sink,
14          F: FnMut(U) -> Fut,
15          Fut: IntoFuture,
16{
17    sink: S,
18    f: F,
19    state: State<Fut::Future, S::SinkItem>,
20    _phantom: PhantomData<fn(U)>,
21}
22
23#[derive(Clone, Debug)]
24enum State<Fut, T> {
25    Empty,
26    Process(Fut),
27    Buffered(T),
28}
29
30impl<Fut, T> State<Fut, T> {
31    fn is_empty(&self) -> bool {
32        if let State::Empty = *self {
33            true
34        } else {
35            false
36        }
37    }
38}
39
40pub fn new<S, U, Fut, F>(sink: S, f: F) -> With<S, U, Fut, F>
41    where S: Sink,
42          F: FnMut(U) -> Fut,
43          Fut: IntoFuture<Item = S::SinkItem>,
44          Fut::Error: From<S::SinkError>,
45{
46    With {
47        state: State::Empty,
48        sink: sink,
49        f: f,
50        _phantom: PhantomData,
51    }
52}
53
54// Forwarding impl of Stream from the underlying sink
55impl<S, U, Fut, F> Stream for With<S, U, Fut, F>
56    where S: Stream + Sink,
57          F: FnMut(U) -> Fut,
58          Fut: IntoFuture
59{
60    type Item = S::Item;
61    type Error = S::Error;
62
63    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
64        self.sink.poll_next(cx)
65    }
66}
67
68impl<S, U, Fut, F> With<S, U, Fut, F>
69    where S: Sink,
70          F: FnMut(U) -> Fut,
71          Fut: IntoFuture<Item = S::SinkItem>,
72          Fut::Error: From<S::SinkError>,
73{
74    /// Get a shared reference to the inner sink.
75    pub fn get_ref(&self) -> &S {
76        &self.sink
77    }
78
79    /// Get a mutable reference to the inner sink.
80    pub fn get_mut(&mut self) -> &mut S {
81        &mut self.sink
82    }
83
84    /// Consumes this combinator, returning the underlying sink.
85    ///
86    /// Note that this may discard intermediate state of this combinator, so
87    /// care should be taken to avoid losing resources when this is called.
88    pub fn into_inner(self) -> S {
89        self.sink
90    }
91
92    fn poll(&mut self, cx: &mut task::Context) -> Poll<(), Fut::Error> {
93        loop {
94            match mem::replace(&mut self.state, State::Empty) {
95                State::Empty => break,
96                State::Process(mut fut) => {
97                    match fut.poll(cx)? {
98                        Async::Ready(item) => {
99                            self.state = State::Buffered(item);
100                        }
101                        Async::Pending => {
102                            self.state = State::Process(fut);
103                            break
104                        }
105                    }
106                }
107                State::Buffered(item) => {
108                    match self.sink.poll_ready(cx)? {
109                        Async::Ready(()) => self.sink.start_send(item)?,
110                        Async::Pending => {
111                            self.state = State::Buffered(item);
112                            break
113                        }
114                    }
115                }
116            }
117        }
118
119        if self.state.is_empty() {
120            Ok(Async::Ready(()))
121        } else {
122            Ok(Async::Pending)
123        }
124    }
125}
126
127impl<S, U, Fut, F> Sink for With<S, U, Fut, F>
128    where S: Sink,
129          F: FnMut(U) -> Fut,
130          Fut: IntoFuture<Item = S::SinkItem>,
131          Fut::Error: From<S::SinkError>,
132{
133    type SinkItem = U;
134    type SinkError = Fut::Error;
135
136    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
137        self.poll(cx)
138    }
139
140    fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
141        self.state = State::Process((self.f)(item).into_future());
142        Ok(())
143    }
144
145    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
146        try_ready!(self.poll(cx));
147        self.sink.poll_flush(cx).map_err(Into::into)
148    }
149
150    fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
151        try_ready!(self.poll(cx));
152        self.sink.poll_close(cx).map_err(Into::into)
153    }
154}