futures_core/stream/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//! Asynchronous streams.

use Poll;
use task;

/// A stream of values produced asynchronously.
///
/// If `Future` is an asynchronous version of `Result`, then `Stream` is an
/// asynchronous version of `Iterator`. A stream represents a sequence of
/// value-producing events that occur asynchronously to the caller.
///
/// The trait is modeled after `Future`, but allows `poll_next` to be called
/// even after a value has been produced, yielding `None` once the stream has
/// been fully exhausted.
///
/// # Errors
///
/// Streams, like futures, also bake in errors through an associated `Error`
/// type. An error on a stream **does not terminate the stream**. That is,
/// after one error is received, another value may be received from the same
/// stream (it's valid to keep polling). Thus a stream is somewhat like an
/// `Iterator<Item = Result<T, E>>`, and is always terminated by returning
/// `None`.
pub trait Stream {
    /// Values yielded by the stream.
    type Item;

    /// Errors yielded by the stream.
    type Error;

    /// Attempt to pull out the next value of this stream, registering the
    /// current task for wakeup if the value is not yet available, and returning
    /// `None` if the stream is exhausted.
    ///
    /// # Return value
    ///
    /// There are several possible return values, each indicating a distinct
    /// stream state:
    ///
    /// - [`Ok(Pending)`](::Async) means that this stream's next value is not
    /// ready yet. Implementations will ensure that the current task will be
    /// notified when the next value may be ready.
    ///
    /// - [`Ok(Ready(Some(val)))`](::Async) means that the stream has
    /// successfully produced a value, `val`, and may produce further values
    /// on subsequent `poll_next` calls.
    ///
    /// - [`Ok(Ready(None))`](::Async) means that the stream has terminated, and
    /// `poll_next` should not be invoked again.
    ///
    /// - `Err(err)` means that the stream encountered an error while trying to
    /// `poll_next`. Subsequent calls to `poll_next` *are* allowed, and may
    /// return further values or errors.
    ///
    /// # Panics
    ///
    /// Once a stream is finished, i.e. `Ready(None)` has been returned, further
    /// calls to `poll_next` may result in a panic or other "bad behavior".  If this
    /// is difficult to guard against then the `fuse` adapter can be used to
    /// ensure that `poll_next` always returns `Ready(None)` in subsequent calls.
    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error>;
}

impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
    type Item = S::Item;
    type Error = S::Error;

    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
        (**self).poll_next(cx)
    }
}

if_std! {
    use Async;
    use never::Never;

    impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> {
        type Item = S::Item;
        type Error = S::Error;

        fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
            (**self).poll_next(cx)
        }
    }

    #[cfg(feature = "nightly")]
    impl<S: ?Sized + Stream> Stream for ::std::boxed::PinBox<S> {
        type Item = S::Item;
        type Error = S::Error;

        fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
            unsafe { ::core::mem::PinMut::get_mut_unchecked(self.as_pin_mut()).poll_next(cx) }
        }
    }

    impl<S: Stream> Stream for ::std::panic::AssertUnwindSafe<S> {
        type Item = S::Item;
        type Error = S::Error;

        fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
            self.0.poll_next(cx)
        }
    }

    impl<T> Stream for ::std::collections::VecDeque<T> {
        type Item = T;
        type Error = Never;

        fn poll_next(&mut self, _cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
            Ok(Async::Ready(self.pop_front()))
        }
    }
}