async_std/stream/stream/
scan.rs1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::stream::Stream;
6use crate::task::{Context, Poll};
7
8pin_project! {
9 #[derive(Debug)]
17 pub struct Scan<S, St, F> {
18 #[pin]
19 stream: S,
20 state_f: (St, F),
21 }
22}
23
24impl<S, St, F> Scan<S, St, F> {
25 pub(crate) fn new(stream: S, initial_state: St, f: F) -> Self {
26 Self {
27 stream,
28 state_f: (initial_state, f),
29 }
30 }
31}
32
33impl<S, St, F, B> Stream for Scan<S, St, F>
34where
35 S: Stream,
36 F: FnMut(&mut St, S::Item) -> Option<B>,
37{
38 type Item = B;
39
40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
41 let mut this = self.project();
42 let poll_result = this.stream.as_mut().poll_next(cx);
43 poll_result.map(|item| {
44 item.and_then(|item| {
45 let (state, f) = this.state_f;
46 f(state, item)
47 })
48 })
49 }
50}