futures_util/stream/
map.rs

1use futures_core::{Async, Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5/// A stream combinator which will change the type of a stream from one
6/// type to another.
7///
8/// This is produced by the `Stream::map` method.
9#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Map<S, F> {
12    stream: S,
13    f: F,
14}
15
16pub fn new<S, U, F>(s: S, f: F) -> Map<S, F>
17    where S: Stream,
18          F: FnMut(S::Item) -> U,
19{
20    Map {
21        stream: s,
22        f: f,
23    }
24}
25
26impl<S, F> Map<S, F> {
27    /// Acquires a reference to the underlying stream that this combinator is
28    /// pulling from.
29    pub fn get_ref(&self) -> &S {
30        &self.stream
31    }
32
33    /// Acquires a mutable reference to the underlying stream that this
34    /// combinator is pulling from.
35    ///
36    /// Note that care must be taken to avoid tampering with the state of the
37    /// stream which may otherwise confuse this combinator.
38    pub fn get_mut(&mut self) -> &mut S {
39        &mut self.stream
40    }
41
42    /// Consumes this combinator, returning the underlying stream.
43    ///
44    /// Note that this may discard intermediate state of this combinator, so
45    /// care should be taken to avoid losing resources when this is called.
46    pub fn into_inner(self) -> S {
47        self.stream
48    }
49}
50
51// Forwarding impl of Sink from the underlying stream
52impl<S, F> Sink for Map<S, F>
53    where S: Sink
54{
55    type SinkItem = S::SinkItem;
56    type SinkError = S::SinkError;
57
58    delegate_sink!(stream);
59}
60
61impl<S, F, U> Stream for Map<S, F>
62    where S: Stream,
63          F: FnMut(S::Item) -> U,
64{
65    type Item = U;
66    type Error = S::Error;
67
68    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<U>, S::Error> {
69        let option = try_ready!(self.stream.poll_next(cx));
70        Ok(Async::Ready(option.map(&mut self.f)))
71    }
72}