async_std/stream/double_ended_stream/
mod.rs

1use crate::stream::Stream;
2
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6mod next_back;
7mod nth_back;
8mod rfind;
9mod rfold;
10mod try_rfold;
11
12use next_back::NextBackFuture;
13use nth_back::NthBackFuture;
14use rfind::RFindFuture;
15use rfold::RFoldFuture;
16use try_rfold::TryRFoldFuture;
17
18/// A stream able to yield elements from both ends.
19///
20/// Something that implements `DoubleEndedStream` has one extra capability
21/// over something that implements [`Stream`]: the ability to also take
22/// `Item`s from the back, as well as the front.
23///
24/// [`Stream`]: trait.Stream.html
25#[cfg(feature = "unstable")]
26#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
27pub trait DoubleEndedStream: Stream {
28    #[doc = r#"
29        Attempts to receive the next item from the back of the stream.
30
31        There are several possible return values:
32
33        * `Poll::Pending` means this stream's next_back value is not ready yet.
34        * `Poll::Ready(None)` means this stream has been exhausted.
35        * `Poll::Ready(Some(item))` means `item` was received out of the stream.
36
37        # Examples
38
39        ```
40        # fn main() { async_std::task::block_on(async {
41        #
42        use std::pin::Pin;
43
44        use async_std::prelude::*;
45        use async_std::stream;
46        use async_std::task::{Context, Poll};
47
48        fn increment(
49            s: impl DoubleEndedStream<Item = i32> + Unpin,
50        ) -> impl DoubleEndedStream<Item = i32> + Unpin {
51            struct Increment<S>(S);
52
53            impl<S: DoubleEndedStream<Item = i32> + Unpin> Stream for Increment<S> {
54                type Item = S::Item;
55
56                fn poll_next(
57                    mut self: Pin<&mut Self>,
58                    cx: &mut Context<'_>,
59                ) -> Poll<Option<Self::Item>> {
60                    match Pin::new(&mut self.0).poll_next(cx) {
61                        Poll::Pending => Poll::Pending,
62                        Poll::Ready(None) => Poll::Ready(None),
63                        Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
64                    }
65                }
66            }
67
68            impl<S: DoubleEndedStream<Item = i32> + Unpin> DoubleEndedStream for Increment<S> {
69                fn poll_next_back(
70                    mut self: Pin<&mut Self>,
71                    cx: &mut Context<'_>,
72                ) -> Poll<Option<Self::Item>> {
73                    match Pin::new(&mut self.0).poll_next_back(cx) {
74                        Poll::Pending => Poll::Pending,
75                        Poll::Ready(None) => Poll::Ready(None),
76                        Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
77                    }
78                }
79            }
80
81            Increment(s)
82        }
83
84        let mut s = increment(stream::once(7));
85
86        assert_eq!(s.next_back().await, Some(8));
87        assert_eq!(s.next_back().await, None);
88        #
89        # }) }
90        ```
91    "#]
92    fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
93
94    #[doc = r#"
95            Advances the stream and returns the next value.
96
97            Returns [`None`] when iteration is finished. Individual stream implementations may
98            choose to resume iteration, and so calling `next()` again may or may not eventually
99            start returning more values.
100
101            [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
102
103            # Examples
104
105            ```
106            # fn main() { async_std::task::block_on(async {
107            #
108            use async_std::prelude::*;
109            use async_std::stream;
110
111            let mut s = stream::from_iter(vec![7u8]);
112
113            assert_eq!(s.next_back().await, Some(7));
114            assert_eq!(s.next_back().await, None);
115            #
116            # }) }
117            ```
118        "#]
119    fn next_back(&mut self) -> NextBackFuture<'_, Self>
120    where
121        Self: Unpin,
122    {
123        NextBackFuture { stream: self }
124    }
125
126    #[doc = r#"
127            Returns the nth element from the back of the stream.
128
129            # Examples
130
131            Basic usage:
132
133            ```
134            # fn main() { async_std::task::block_on(async {
135            #
136            use async_std::prelude::*;
137            use async_std::stream;
138
139            let mut s = stream::from_iter(vec![1u8, 2, 3, 4, 5]);
140
141            let second = s.nth_back(1).await;
142            assert_eq!(second, Some(4));
143            #
144            # }) }
145            ```
146        "#]
147    fn nth_back(&mut self, n: usize) -> NthBackFuture<'_, Self>
148    where
149        Self: Unpin + Sized,
150    {
151        NthBackFuture::new(self, n)
152    }
153
154    #[doc = r#"
155            Returns the first element from the right that matches the predicate.
156
157            # Examples
158
159            Basic usage:
160
161            ```
162            # fn main() { async_std::task::block_on(async {
163            #
164            use async_std::prelude::*;
165            use async_std::stream;
166
167            let mut s = stream::from_iter(vec![1u8, 2, 3, 4, 5]);
168
169            let second = s.rfind(|v| v % 2 == 0).await;
170            assert_eq!(second, Some(4));
171            #
172            # }) }
173            ```
174        "#]
175    fn rfind<P>(&mut self, p: P) -> RFindFuture<'_, Self, P>
176    where
177        Self: Unpin + Sized,
178        P: FnMut(&Self::Item) -> bool,
179    {
180        RFindFuture::new(self, p)
181    }
182
183    #[doc = r#"
184            # Examples
185
186            Basic usage:
187
188            ```
189            # fn main() { async_std::task::block_on(async {
190            #
191            use async_std::prelude::*;
192            use async_std::stream;
193
194            let s = stream::from_iter(vec![1u8, 2, 3, 4, 5]);
195
196            let second = s.rfold(0, |acc, v| v + acc).await;
197
198            assert_eq!(second, 15);
199            #
200            # }) }
201            ```
202        "#]
203    fn rfold<B, F>(self, accum: B, f: F) -> RFoldFuture<Self, F, B>
204    where
205        Self: Sized,
206        F: FnMut(B, Self::Item) -> B,
207    {
208        RFoldFuture::new(self, accum, f)
209    }
210
211    #[doc = r#"
212            A combinator that applies a function as long as it returns successfully, producing a single, final value.
213            Immediately returns the error when the function returns unsuccessfully.
214
215            # Examples
216
217            Basic usage:
218
219            ```
220            # fn main() { async_std::task::block_on(async {
221            #
222            use async_std::prelude::*;
223            use async_std::stream;
224
225            let s = stream::from_iter(vec![1u8, 2, 3, 4, 5]);
226            let sum = s.try_rfold(0, |acc, v| {
227                if (acc+v) % 2 == 1 {
228                    Ok(v+3)
229                } else {
230                    Err("fail")
231                }
232            }).await;
233
234            assert_eq!(sum, Err("fail"));
235            #
236            # }) }
237            ```
238        "#]
239    fn try_rfold<B, F, E>(self, accum: B, f: F) -> TryRFoldFuture<Self, F, B>
240    where
241        Self: Sized,
242        F: FnMut(B, Self::Item) -> Result<B, E>,
243    {
244        TryRFoldFuture::new(self, accum, f)
245    }
246}