futures_util/stream/
take_while.rs1use futures_core::{Async, Poll, IntoFuture, Future, Stream};
2use futures_core::task;
3use futures_sink::{ Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct TakeWhile<S, R, P> where S: Stream, R: IntoFuture {
12 stream: S,
13 pred: P,
14 pending: Option<(R::Future, S::Item)>,
15 done_taking: bool,
16}
17
18pub fn new<S, R, P>(s: S, p: P) -> TakeWhile<S, R, P>
19 where S: Stream,
20 P: FnMut(&S::Item) -> R,
21 R: IntoFuture<Item=bool, Error=S::Error>,
22{
23 TakeWhile {
24 stream: s,
25 pred: p,
26 pending: None,
27 done_taking: false,
28 }
29}
30
31impl<S, R, P> TakeWhile<S, R, P> where S: Stream, R: IntoFuture {
32 pub fn get_ref(&self) -> &S {
35 &self.stream
36 }
37
38 pub fn get_mut(&mut self) -> &mut S {
44 &mut self.stream
45 }
46
47 pub fn into_inner(self) -> S {
52 self.stream
53 }
54}
55
56impl<S, R, P> Sink for TakeWhile<S, R, P>
58 where S: Sink + Stream, R: IntoFuture
59{
60 type SinkItem = S::SinkItem;
61 type SinkError = S::SinkError;
62
63 delegate_sink!(stream);
64}
65
66impl<S, R, P> Stream for TakeWhile<S, R, P>
67 where S: Stream,
68 P: FnMut(&S::Item) -> R,
69 R: IntoFuture<Item=bool, Error=S::Error>,
70{
71 type Item = S::Item;
72 type Error = S::Error;
73
74 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
75 if self.done_taking {
76 return Ok(Async::Ready(None));
77 }
78
79 if self.pending.is_none() {
80 let item = match try_ready!(self.stream.poll_next(cx)) {
81 Some(e) => e,
82 None => return Ok(Async::Ready(None)),
83 };
84 self.pending = Some(((self.pred)(&item).into_future(), item));
85 }
86
87 assert!(self.pending.is_some());
88 match self.pending.as_mut().unwrap().0.poll(cx) {
89 Ok(Async::Ready(true)) => {
90 let (_, item) = self.pending.take().unwrap();
91 Ok(Async::Ready(Some(item)))
92 },
93 Ok(Async::Ready(false)) => {
94 self.done_taking = true;
95 self.pending = None;
96 Ok(Async::Ready(None))
97 }
98 Ok(Async::Pending) => Ok(Async::Pending),
99 Err(e) => {
100 self.pending = None;
101 Err(e)
102 }
103 }
104 }
105}