futures_util/stream/
err_into.rs

1use core::marker::PhantomData;
2
3use futures_core::{Async, Poll, Stream};
4use futures_core::task;
5use futures_sink::{Sink};
6
7/// A stream combinator to change the error type of a stream.
8///
9/// This is created by the `Stream::err_into` method.
10#[derive(Debug)]
11#[must_use = "futures do nothing unless polled"]
12pub struct ErrInto<S, E> {
13    stream: S,
14    f: PhantomData<E>
15}
16
17pub fn new<S, E>(stream: S) -> ErrInto<S, E>
18    where S: Stream
19{
20    ErrInto {
21        stream: stream,
22        f: PhantomData
23    }
24}
25
26impl<S, E> ErrInto<S, E> {
27    /// Acquires a reference to the underlying stream that this combinator is
28    /// pulling from.
29    pub fn get_ref(&self) -> &S {
30        &self.stream
31    }
32
33    /// Acquires a mutable reference to the underlying stream that this
34    /// combinator is pulling from.
35    ///
36    /// Note that care must be taken to avoid tampering with the state of the
37    /// stream which may otherwise confuse this combinator.
38    pub fn get_mut(&mut self) -> &mut S {
39        &mut self.stream
40    }
41
42    /// Consumes this combinator, returning the underlying stream.
43    ///
44    /// Note that this may discard intermediate state of this combinator, so
45    /// care should be taken to avoid losing resources when this is called.
46    pub fn into_inner(self) -> S {
47        self.stream
48    }
49}
50
51
52impl<S: Stream, E> Stream for ErrInto<S, E>
53    where S::Error: Into<E>,
54{
55    type Item = S::Item;
56    type Error = E;
57
58    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, E> {
59        let e = match self.stream.poll_next(cx) {
60            Ok(Async::Pending) => return Ok(Async::Pending),
61            other => other,
62        };
63        e.map_err(Into::into)
64    }
65}
66
67// Forwarding impl of Sink from the underlying stream
68impl<S: Stream + Sink, E> Sink for ErrInto<S, E> {
69    type SinkItem = S::SinkItem;
70    type SinkError = S::SinkError;
71    
72    delegate_sink!(stream);
73}