futures_util/stream/
skip_while.rs1use futures_core::{Async, Poll, IntoFuture, Future, Stream};
2use futures_core::task;
3use futures_sink::{ Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct SkipWhile<S, R, P> where S: Stream, R: IntoFuture {
12 stream: S,
13 pred: P,
14 pending: Option<(R::Future, S::Item)>,
15 done_skipping: bool,
16}
17
18pub fn new<S, R, P>(s: S, p: P) -> SkipWhile<S, R, P>
19 where S: Stream,
20 P: FnMut(&S::Item) -> R,
21 R: IntoFuture<Item=bool, Error=S::Error>,
22{
23 SkipWhile {
24 stream: s,
25 pred: p,
26 pending: None,
27 done_skipping: false,
28 }
29}
30
31impl<S, R, P> SkipWhile<S, R, P> where S: Stream, R: IntoFuture {
32 pub fn get_ref(&self) -> &S {
35 &self.stream
36 }
37
38 pub fn get_mut(&mut self) -> &mut S {
44 &mut self.stream
45 }
46
47 pub fn into_inner(self) -> S {
52 self.stream
53 }
54}
55
56impl<S, R, P> Sink for SkipWhile<S, R, P>
58 where S: Sink + Stream, R: IntoFuture
59{
60 type SinkItem = S::SinkItem;
61 type SinkError = S::SinkError;
62
63 delegate_sink!(stream);
64}
65
66impl<S, R, P> Stream for SkipWhile<S, R, P>
67 where S: Stream,
68 P: FnMut(&S::Item) -> R,
69 R: IntoFuture<Item=bool, Error=S::Error>,
70{
71 type Item = S::Item;
72 type Error = S::Error;
73
74 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
75 if self.done_skipping {
76 return self.stream.poll_next(cx);
77 }
78
79 loop {
80 if self.pending.is_none() {
81 let item = match try_ready!(self.stream.poll_next(cx)) {
82 Some(e) => e,
83 None => return Ok(Async::Ready(None)),
84 };
85 self.pending = Some(((self.pred)(&item).into_future(), item));
86 }
87
88 match self.pending.as_mut().unwrap().0.poll(cx) {
89 Ok(Async::Ready(true)) => self.pending = None,
90 Ok(Async::Ready(false)) => {
91 let (_, item) = self.pending.take().unwrap();
92 self.done_skipping = true;
93 return Ok(Async::Ready(Some(item)))
94 }
95 Ok(Async::Pending) => return Ok(Async::Pending),
96 Err(e) => {
97 self.pending = None;
98 return Err(e)
99 }
100 }
101 }
102 }
103}