futures_util/stream/fuse.rs
1use futures_core::{Poll, Async, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5/// A stream which "fuse"s a stream once it's terminated.
6///
7/// Normally streams can behave unpredictably when used after they have already
8/// finished, but `Fuse` continues to return `None` from `poll` forever when
9/// finished.
10#[derive(Debug)]
11#[must_use = "streams do nothing unless polled"]
12pub struct Fuse<S> {
13 stream: S,
14 done: bool,
15}
16
17// Forwarding impl of Sink from the underlying stream
18impl<S> Sink for Fuse<S>
19 where S: Sink
20{
21 type SinkItem = S::SinkItem;
22 type SinkError = S::SinkError;
23
24 delegate_sink!(stream);
25}
26
27pub fn new<S: Stream>(s: S) -> Fuse<S> {
28 Fuse { stream: s, done: false }
29}
30
31impl<S: Stream> Stream for Fuse<S> {
32 type Item = S::Item;
33 type Error = S::Error;
34
35 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
36 if self.done {
37 Ok(Async::Ready(None))
38 } else {
39 let r = self.stream.poll_next(cx);
40 if let Ok(Async::Ready(None)) = r {
41 self.done = true;
42 }
43 r
44 }
45 }
46}
47
48impl<S> Fuse<S> {
49 /// Returns whether the underlying stream has finished or not.
50 ///
51 /// If this method returns `true`, then all future calls to poll are
52 /// guaranteed to return `None`. If this returns `false`, then the
53 /// underlying stream is still in use.
54 pub fn is_done(&self) -> bool {
55 self.done
56 }
57
58 /// Acquires a reference to the underlying stream that this combinator is
59 /// pulling from.
60 pub fn get_ref(&self) -> &S {
61 &self.stream
62 }
63
64 /// Acquires a mutable reference to the underlying stream that this
65 /// combinator is pulling from.
66 ///
67 /// Note that care must be taken to avoid tampering with the state of the
68 /// stream which may otherwise confuse this combinator.
69 pub fn get_mut(&mut self) -> &mut S {
70 &mut self.stream
71 }
72
73 /// Consumes this combinator, returning the underlying stream.
74 ///
75 /// Note that this may discard intermediate state of this combinator, so
76 /// care should be taken to avoid losing resources when this is called.
77 pub fn into_inner(self) -> S {
78 self.stream
79 }
80}