futures_util/stream/
err_into.rs1use core::marker::PhantomData;
2
3use futures_core::{Async, Poll, Stream};
4use futures_core::task;
5use futures_sink::{Sink};
6
7#[derive(Debug)]
11#[must_use = "futures do nothing unless polled"]
12pub struct ErrInto<S, E> {
13 stream: S,
14 f: PhantomData<E>
15}
16
17pub fn new<S, E>(stream: S) -> ErrInto<S, E>
18 where S: Stream
19{
20 ErrInto {
21 stream: stream,
22 f: PhantomData
23 }
24}
25
26impl<S, E> ErrInto<S, E> {
27 pub fn get_ref(&self) -> &S {
30 &self.stream
31 }
32
33 pub fn get_mut(&mut self) -> &mut S {
39 &mut self.stream
40 }
41
42 pub fn into_inner(self) -> S {
47 self.stream
48 }
49}
50
51
52impl<S: Stream, E> Stream for ErrInto<S, E>
53 where S::Error: Into<E>,
54{
55 type Item = S::Item;
56 type Error = E;
57
58 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, E> {
59 let e = match self.stream.poll_next(cx) {
60 Ok(Async::Pending) => return Ok(Async::Pending),
61 other => other,
62 };
63 e.map_err(Into::into)
64 }
65}
66
67impl<S: Stream + Sink, E> Sink for ErrInto<S, E> {
69 type SinkItem = S::SinkItem;
70 type SinkError = S::SinkError;
71
72 delegate_sink!(stream);
73}