futures_util/stream/
inspect.rs

1use futures_core::{Stream, Poll, Async};
2use futures_core::task;
3use futures_sink::{Sink};
4
5/// Do something with the items of a stream, passing it on.
6///
7/// This is created by the `Stream::inspect` method.
8#[derive(Debug)]
9#[must_use = "streams do nothing unless polled"]
10pub struct Inspect<S, F> where S: Stream {
11    stream: S,
12    inspect: F,
13}
14
15pub fn new<S, F>(stream: S, f: F) -> Inspect<S, F>
16    where S: Stream,
17          F: FnMut(&S::Item) -> (),
18{
19    Inspect {
20        stream: stream,
21        inspect: f,
22    }
23}
24
25impl<S: Stream, F> Inspect<S, F> {
26    /// Acquires a reference to the underlying stream that this combinator is
27    /// pulling from.
28    pub fn get_ref(&self) -> &S {
29        &self.stream
30    }
31
32    /// Acquires a mutable reference to the underlying stream that this
33    /// combinator is pulling from.
34    ///
35    /// Note that care must be taken to avoid tampering with the state of the
36    /// stream which may otherwise confuse this combinator.
37    pub fn get_mut(&mut self) -> &mut S {
38        &mut self.stream
39    }
40
41    /// Consumes this combinator, returning the underlying stream.
42    ///
43    /// Note that this may discard intermediate state of this combinator, so
44    /// care should be taken to avoid losing resources when this is called.
45    pub fn into_inner(self) -> S {
46        self.stream
47    }
48}
49
50// Forwarding impl of Sink from the underlying stream
51impl<S, F> Sink for Inspect<S, F>
52    where S: Sink + Stream
53{
54    type SinkItem = S::SinkItem;
55    type SinkError = S::SinkError;
56
57    delegate_sink!(stream);
58}
59
60impl<S, F> Stream for Inspect<S, F>
61    where S: Stream,
62          F: FnMut(&S::Item),
63{
64    type Item = S::Item;
65    type Error = S::Error;
66
67    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
68        match try_ready!(self.stream.poll_next(cx)) {
69            Some(e) => {
70                (self.inspect)(&e);
71                Ok(Async::Ready(Some(e)))
72            }
73            None => Ok(Async::Ready(None)),
74        }
75    }
76}