futures_util/sink/
mod.rs

1//! Sinks
2//!
3//! This module contains a number of functions for working with `Sink`s,
4//! including the `SinkExt` trait which adds methods to `Sink` types.
5
6use futures_core::{Stream, IntoFuture};
7use futures_sink::Sink;
8use super::future::Either;
9
10mod close;
11mod fanout;
12mod flush;
13mod err_into;
14mod map_err;
15mod send;
16mod send_all;
17mod with;
18mod with_flat_map;
19
20if_std! {
21    mod buffer;
22    pub use self::buffer::Buffer;
23}
24
25pub use self::close::Close;
26pub use self::fanout::Fanout;
27pub use self::flush::Flush;
28pub use self::err_into::SinkErrInto;
29pub use self::map_err::SinkMapErr;
30pub use self::send::Send;
31pub use self::send_all::SendAll;
32pub use self::with::With;
33pub use self::with_flat_map::WithFlatMap;
34
35impl<T: ?Sized> SinkExt for T where T: Sink {}
36
37/// An extension trait for `Sink`s that provides a variety of convenient
38/// combinator functions.
39pub trait SinkExt: Sink {
40    /// Composes a function *in front of* the sink.
41    ///
42    /// This adapter produces a new sink that passes each value through the
43    /// given function `f` before sending it to `self`.
44    ///
45    /// To process each value, `f` produces a *future*, which is then polled to
46    /// completion before passing its result down to the underlying sink. If the
47    /// future produces an error, that error is returned by the new sink.
48    ///
49    /// Note that this function consumes the given sink, returning a wrapped
50    /// version, much like `Iterator::map`.
51    fn with<U, Fut, F>(self, f: F) -> With<Self, U, Fut, F>
52        where F: FnMut(U) -> Fut,
53              Fut: IntoFuture<Item = Self::SinkItem>,
54              Fut::Error: From<Self::SinkError>,
55              Self: Sized
56    {
57        with::new(self, f)
58    }
59
60    /// Composes a function *in front of* the sink.
61    ///
62    /// This adapter produces a new sink that passes each value through the
63    /// given function `f` before sending it to `self`.
64    ///
65    /// To process each value, `f` produces a *stream*, of which each value
66    /// is passed to the underlying sink. A new value will not be accepted until
67    /// the stream has been drained
68    ///
69    /// Note that this function consumes the given sink, returning a wrapped
70    /// version, much like `Iterator::flat_map`.
71    ///
72    /// # Examples
73    /// ---
74    /// Using this function with an iterator through use of the `stream::iter_ok()`
75    /// function
76    ///
77    /// ```
78    /// # extern crate futures;
79    /// # extern crate futures_channel;
80    /// # extern crate futures_executor;
81    /// use futures::prelude::*;
82    /// use futures::stream;
83    /// use futures_channel::mpsc;
84    /// use futures_executor::block_on;
85    ///
86    /// # fn main() {
87    /// let (tx, rx) = mpsc::channel::<i32>(5);
88    ///
89    /// let tx = tx.with_flat_map(|x| {
90    ///     stream::iter_ok(vec![42; x].into_iter().map(|y| y))
91    /// });
92    ///
93    /// block_on(tx.send(5)).unwrap();
94    /// assert_eq!(block_on(rx.collect()), Ok(vec![42, 42, 42, 42, 42]));
95    /// # }
96    /// ```
97    fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, U, St, F>
98        where F: FnMut(U) -> St,
99              St: Stream<Item = Self::SinkItem, Error=Self::SinkError>,
100              Self: Sized
101        {
102            with_flat_map::new(self, f)
103        }
104
105    /*
106    fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
107        where F: FnMut(U) -> Self::SinkItem,
108              Self: Sized;
109
110    fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
111        where F: FnMut(Self::SinkItem) -> bool,
112              Self: Sized;
113
114    fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
115        where F: FnMut(U) -> Option<Self::SinkItem>,
116              Self: Sized;
117     */
118
119    /// Transforms the error returned by the sink.
120    fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
121        where F: FnOnce(Self::SinkError) -> E,
122              Self: Sized,
123    {
124        map_err::new(self, f)
125    }
126
127    /// Map this sink's error to a different error type using the `Into` trait.
128    ///
129    /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
130    fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, E>
131        where Self: Sized,
132              Self::SinkError: Into<E>,
133    {
134        err_into::new(self)
135    }
136
137
138    /// Adds a fixed-size buffer to the current sink.
139    ///
140    /// The resulting sink will buffer up to `amt` items when the underlying
141    /// sink is unwilling to accept additional items. Calling `flush` on
142    /// the buffered sink will attempt to both empty the buffer and complete
143    /// processing on the underlying sink.
144    ///
145    /// Note that this function consumes the given sink, returning a wrapped
146    /// version, much like `Iterator::map`.
147    ///
148    /// This method is only available when the `std` feature of this
149    /// library is activated, and it is activated by default.
150    #[cfg(feature = "std")]
151    fn buffer(self, amt: usize) -> Buffer<Self>
152        where Self: Sized
153    {
154        buffer::new(self, amt)
155    }
156
157    /// Close the sink.
158    ///
159    /// The sink itself is returned after closeing is complete.
160    fn close(self) -> Close<Self>
161        where Self: Sized
162    {
163        close::new(self)
164    }
165
166    /// Fanout items to multiple sinks.
167    ///
168    /// This adapter clones each incoming item and forwards it to both this as well as
169    /// the other sink at the same time.
170    fn fanout<S>(self, other: S) -> Fanout<Self, S>
171        where Self: Sized,
172              Self::SinkItem: Clone,
173              S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError>
174    {
175        fanout::new(self, other)
176    }
177
178    /// Flush the sync, processing all pending items.
179    ///
180    /// The sink itself is returned after flushing is complete; this adapter is
181    /// intended to be used when you want to stop sending to the sink until
182    /// all current requests are processed.
183    fn flush(self) -> Flush<Self>
184        where Self: Sized
185    {
186        flush::new(self)
187    }
188
189    /// A future that completes after the given item has been fully processed
190    /// into the sink, including flushing.
191    ///
192    /// Note that, **because of the flushing requirement, it is usually better
193    /// to batch together items to send via `send_all`, rather than flushing
194    /// between each item.**
195    ///
196    /// On completion, the sink is returned.
197    fn send(self, item: Self::SinkItem) -> Send<Self>
198        where Self: Sized
199    {
200        send::new(self, item)
201    }
202
203    /// A future that completes after the given stream has been fully processed
204    /// into the sink, including flushing.
205    ///
206    /// This future will drive the stream to keep producing items until it is
207    /// exhausted, sending each item to the sink. It will complete once both the
208    /// stream is exhausted, the sink has received all items, and the sink has
209    /// been flushed. Note that the sink is **not** closed.
210    ///
211    /// Doing `sink.send_all(stream)` is roughly equivalent to
212    /// `stream.forward(sink)`. The returned future will exhaust all items from
213    /// `stream` and send them to `self`.
214    ///
215    /// On completion, the pair `(sink, source)` is returned.
216    fn send_all<S>(self, stream: S) -> SendAll<Self, S>
217        where S: Stream<Item = Self::SinkItem>,
218              Self::SinkError: From<S::Error>,
219              Self: Sized
220    {
221        send_all::new(self, stream)
222    }
223
224    /// Wrap this sink in an `Either` sink, making it the left-hand variant
225    /// of that `Either`.
226    ///
227    /// This can be used in combination with the `right_sink` method to write `if`
228    /// statements that evaluate to different streams in different branches.
229    fn left_sink<B>(self) -> Either<Self, B>
230        where B: Sink<SinkItem = Self::SinkItem, SinkError = Self::SinkError>,
231              Self: Sized
232    {
233        Either::Left(self)
234    }
235
236    /// Wrap this stream in an `Either` stream, making it the right-hand variant
237    /// of that `Either`.
238    ///
239    /// This can be used in combination with the `left_sink` method to write `if`
240    /// statements that evaluate to different streams in different branches.
241    fn right_sink<B>(self) -> Either<B, Self>
242        where B: Sink<SinkItem = Self::SinkItem, SinkError = Self::SinkError>,
243              Self: Sized
244    {
245        Either::Right(self)
246    }
247}