futures_util/stream/
map.rs1use futures_core::{Async, Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Map<S, F> {
12 stream: S,
13 f: F,
14}
15
16pub fn new<S, U, F>(s: S, f: F) -> Map<S, F>
17 where S: Stream,
18 F: FnMut(S::Item) -> U,
19{
20 Map {
21 stream: s,
22 f: f,
23 }
24}
25
26impl<S, F> Map<S, F> {
27 pub fn get_ref(&self) -> &S {
30 &self.stream
31 }
32
33 pub fn get_mut(&mut self) -> &mut S {
39 &mut self.stream
40 }
41
42 pub fn into_inner(self) -> S {
47 self.stream
48 }
49}
50
51impl<S, F> Sink for Map<S, F>
53 where S: Sink
54{
55 type SinkItem = S::SinkItem;
56 type SinkError = S::SinkError;
57
58 delegate_sink!(stream);
59}
60
61impl<S, F, U> Stream for Map<S, F>
62 where S: Stream,
63 F: FnMut(S::Item) -> U,
64{
65 type Item = U;
66 type Error = S::Error;
67
68 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<U>, S::Error> {
69 let option = try_ready!(self.stream.poll_next(cx));
70 Ok(Async::Ready(option.map(&mut self.f)))
71 }
72}