futures_util/sink/mod.rs
1//! Sinks
2//!
3//! This module contains a number of functions for working with `Sink`s,
4//! including the `SinkExt` trait which adds methods to `Sink` types.
5
6use futures_core::{Stream, IntoFuture};
7use futures_sink::Sink;
8use super::future::Either;
9
10mod close;
11mod fanout;
12mod flush;
13mod err_into;
14mod map_err;
15mod send;
16mod send_all;
17mod with;
18mod with_flat_map;
19
20if_std! {
21 mod buffer;
22 pub use self::buffer::Buffer;
23}
24
25pub use self::close::Close;
26pub use self::fanout::Fanout;
27pub use self::flush::Flush;
28pub use self::err_into::SinkErrInto;
29pub use self::map_err::SinkMapErr;
30pub use self::send::Send;
31pub use self::send_all::SendAll;
32pub use self::with::With;
33pub use self::with_flat_map::WithFlatMap;
34
35impl<T: ?Sized> SinkExt for T where T: Sink {}
36
37/// An extension trait for `Sink`s that provides a variety of convenient
38/// combinator functions.
39pub trait SinkExt: Sink {
40 /// Composes a function *in front of* the sink.
41 ///
42 /// This adapter produces a new sink that passes each value through the
43 /// given function `f` before sending it to `self`.
44 ///
45 /// To process each value, `f` produces a *future*, which is then polled to
46 /// completion before passing its result down to the underlying sink. If the
47 /// future produces an error, that error is returned by the new sink.
48 ///
49 /// Note that this function consumes the given sink, returning a wrapped
50 /// version, much like `Iterator::map`.
51 fn with<U, Fut, F>(self, f: F) -> With<Self, U, Fut, F>
52 where F: FnMut(U) -> Fut,
53 Fut: IntoFuture<Item = Self::SinkItem>,
54 Fut::Error: From<Self::SinkError>,
55 Self: Sized
56 {
57 with::new(self, f)
58 }
59
60 /// Composes a function *in front of* the sink.
61 ///
62 /// This adapter produces a new sink that passes each value through the
63 /// given function `f` before sending it to `self`.
64 ///
65 /// To process each value, `f` produces a *stream*, of which each value
66 /// is passed to the underlying sink. A new value will not be accepted until
67 /// the stream has been drained
68 ///
69 /// Note that this function consumes the given sink, returning a wrapped
70 /// version, much like `Iterator::flat_map`.
71 ///
72 /// # Examples
73 /// ---
74 /// Using this function with an iterator through use of the `stream::iter_ok()`
75 /// function
76 ///
77 /// ```
78 /// # extern crate futures;
79 /// # extern crate futures_channel;
80 /// # extern crate futures_executor;
81 /// use futures::prelude::*;
82 /// use futures::stream;
83 /// use futures_channel::mpsc;
84 /// use futures_executor::block_on;
85 ///
86 /// # fn main() {
87 /// let (tx, rx) = mpsc::channel::<i32>(5);
88 ///
89 /// let tx = tx.with_flat_map(|x| {
90 /// stream::iter_ok(vec![42; x].into_iter().map(|y| y))
91 /// });
92 ///
93 /// block_on(tx.send(5)).unwrap();
94 /// assert_eq!(block_on(rx.collect()), Ok(vec![42, 42, 42, 42, 42]));
95 /// # }
96 /// ```
97 fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, U, St, F>
98 where F: FnMut(U) -> St,
99 St: Stream<Item = Self::SinkItem, Error=Self::SinkError>,
100 Self: Sized
101 {
102 with_flat_map::new(self, f)
103 }
104
105 /*
106 fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
107 where F: FnMut(U) -> Self::SinkItem,
108 Self: Sized;
109
110 fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
111 where F: FnMut(Self::SinkItem) -> bool,
112 Self: Sized;
113
114 fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
115 where F: FnMut(U) -> Option<Self::SinkItem>,
116 Self: Sized;
117 */
118
119 /// Transforms the error returned by the sink.
120 fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
121 where F: FnOnce(Self::SinkError) -> E,
122 Self: Sized,
123 {
124 map_err::new(self, f)
125 }
126
127 /// Map this sink's error to a different error type using the `Into` trait.
128 ///
129 /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
130 fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, E>
131 where Self: Sized,
132 Self::SinkError: Into<E>,
133 {
134 err_into::new(self)
135 }
136
137
138 /// Adds a fixed-size buffer to the current sink.
139 ///
140 /// The resulting sink will buffer up to `amt` items when the underlying
141 /// sink is unwilling to accept additional items. Calling `flush` on
142 /// the buffered sink will attempt to both empty the buffer and complete
143 /// processing on the underlying sink.
144 ///
145 /// Note that this function consumes the given sink, returning a wrapped
146 /// version, much like `Iterator::map`.
147 ///
148 /// This method is only available when the `std` feature of this
149 /// library is activated, and it is activated by default.
150 #[cfg(feature = "std")]
151 fn buffer(self, amt: usize) -> Buffer<Self>
152 where Self: Sized
153 {
154 buffer::new(self, amt)
155 }
156
157 /// Close the sink.
158 ///
159 /// The sink itself is returned after closeing is complete.
160 fn close(self) -> Close<Self>
161 where Self: Sized
162 {
163 close::new(self)
164 }
165
166 /// Fanout items to multiple sinks.
167 ///
168 /// This adapter clones each incoming item and forwards it to both this as well as
169 /// the other sink at the same time.
170 fn fanout<S>(self, other: S) -> Fanout<Self, S>
171 where Self: Sized,
172 Self::SinkItem: Clone,
173 S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError>
174 {
175 fanout::new(self, other)
176 }
177
178 /// Flush the sync, processing all pending items.
179 ///
180 /// The sink itself is returned after flushing is complete; this adapter is
181 /// intended to be used when you want to stop sending to the sink until
182 /// all current requests are processed.
183 fn flush(self) -> Flush<Self>
184 where Self: Sized
185 {
186 flush::new(self)
187 }
188
189 /// A future that completes after the given item has been fully processed
190 /// into the sink, including flushing.
191 ///
192 /// Note that, **because of the flushing requirement, it is usually better
193 /// to batch together items to send via `send_all`, rather than flushing
194 /// between each item.**
195 ///
196 /// On completion, the sink is returned.
197 fn send(self, item: Self::SinkItem) -> Send<Self>
198 where Self: Sized
199 {
200 send::new(self, item)
201 }
202
203 /// A future that completes after the given stream has been fully processed
204 /// into the sink, including flushing.
205 ///
206 /// This future will drive the stream to keep producing items until it is
207 /// exhausted, sending each item to the sink. It will complete once both the
208 /// stream is exhausted, the sink has received all items, and the sink has
209 /// been flushed. Note that the sink is **not** closed.
210 ///
211 /// Doing `sink.send_all(stream)` is roughly equivalent to
212 /// `stream.forward(sink)`. The returned future will exhaust all items from
213 /// `stream` and send them to `self`.
214 ///
215 /// On completion, the pair `(sink, source)` is returned.
216 fn send_all<S>(self, stream: S) -> SendAll<Self, S>
217 where S: Stream<Item = Self::SinkItem>,
218 Self::SinkError: From<S::Error>,
219 Self: Sized
220 {
221 send_all::new(self, stream)
222 }
223
224 /// Wrap this sink in an `Either` sink, making it the left-hand variant
225 /// of that `Either`.
226 ///
227 /// This can be used in combination with the `right_sink` method to write `if`
228 /// statements that evaluate to different streams in different branches.
229 fn left_sink<B>(self) -> Either<Self, B>
230 where B: Sink<SinkItem = Self::SinkItem, SinkError = Self::SinkError>,
231 Self: Sized
232 {
233 Either::Left(self)
234 }
235
236 /// Wrap this stream in an `Either` stream, making it the right-hand variant
237 /// of that `Either`.
238 ///
239 /// This can be used in combination with the `left_sink` method to write `if`
240 /// statements that evaluate to different streams in different branches.
241 fn right_sink<B>(self) -> Either<B, Self>
242 where B: Sink<SinkItem = Self::SinkItem, SinkError = Self::SinkError>,
243 Self: Sized
244 {
245 Either::Right(self)
246 }
247}