async_std/stream/successors.rs
1use core::mem;
2use core::pin::Pin;
3
4use crate::stream::Stream;
5use crate::task::{Context, Poll};
6
7use pin_project_lite::pin_project;
8
9/// Creates a new stream where to produce each new element a closure is called with the previous
10/// value.
11///
12/// # Examples
13///
14/// ```
15/// # fn main() { async_std::task::block_on(async {
16/// #
17/// use async_std::prelude::*;
18/// use async_std::stream;
19///
20/// let mut s = stream::successors(Some(22), |&val| Some(val + 1));
21///
22/// assert_eq!(s.next().await, Some(22));
23/// assert_eq!(s.next().await, Some(23));
24/// assert_eq!(s.next().await, Some(24));
25/// assert_eq!(s.next().await, Some(25));
26///
27/// #
28/// # }) }
29/// ```
30#[cfg(feature = "unstable")]
31#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
32pub fn successors<F, T>(first: Option<T>, succ: F) -> Successors<F, T>
33where
34 F: FnMut(&T) -> Option<T>,
35{
36 Successors { succ, slot: first }
37}
38
39pin_project! {
40 /// A stream that yields elements by calling an async closure with the previous value as an
41 /// argument
42 ///
43 /// This stream is constructed by [`successors`] function
44 ///
45 /// [`successors`]: fn.successors.html
46 #[cfg(feature = "unstable")]
47 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
48 #[derive(Debug)]
49 pub struct Successors<F, T>
50 where
51 F: FnMut(&T) -> Option<T>
52 {
53 succ: F,
54 slot: Option<T>,
55 }
56}
57
58impl<F, T> Stream for Successors<F, T>
59where
60 F: FnMut(&T) -> Option<T>,
61{
62 type Item = T;
63
64 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 let this = self.project();
66
67 if this.slot.is_none() {
68 return Poll::Ready(None);
69 }
70
71 let mut next = (this.succ)(&this.slot.as_ref().unwrap());
72
73 // 'swapping' here means 'slot' will hold the next value and next will be th one from the previous iteration
74 mem::swap(this.slot, &mut next);
75 Poll::Ready(next)
76 }
77}