broker_tokio/stream/
mod.rs

1//! Stream utilities for Tokio.
2//!
3//! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait.
4//!
5//! This module provides helpers to work with them.
6
7mod all;
8use all::AllFuture;
9
10mod any;
11use any::AnyFuture;
12
13mod chain;
14use chain::Chain;
15
16mod collect;
17use collect::Collect;
18pub use collect::FromStream;
19
20mod empty;
21pub use empty::{empty, Empty};
22
23mod filter;
24use filter::Filter;
25
26mod filter_map;
27use filter_map::FilterMap;
28
29mod fuse;
30use fuse::Fuse;
31
32mod iter;
33pub use iter::{iter, Iter};
34
35mod map;
36use map::Map;
37
38mod merge;
39use merge::Merge;
40
41mod next;
42use next::Next;
43
44mod once;
45pub use once::{once, Once};
46
47mod pending;
48pub use pending::{pending, Pending};
49
50mod try_next;
51use try_next::TryNext;
52
53mod take;
54use take::Take;
55
56mod take_while;
57use take_while::TakeWhile;
58
59pub use futures_core::Stream;
60
61/// An extension trait for `Stream`s that provides a variety of convenient
62/// combinator functions.
63pub trait StreamExt: Stream {
64    /// Consumes and returns the next value in the stream or `None` if the
65    /// stream is finished.
66    ///
67    /// Equivalent to:
68    ///
69    /// ```ignore
70    /// async fn next(&mut self) -> Option<Self::Item>;
71    /// ```
72    ///
73    /// Note that because `next` doesn't take ownership over the stream,
74    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
75    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
76    /// be done by boxing the stream using [`Box::pin`] or
77    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
78    /// crate.
79    ///
80    /// # Examples
81    ///
82    /// ```
83    /// # #[tokio::main]
84    /// # async fn main() {
85    /// use tokio::stream::{self, StreamExt};
86    ///
87    /// let mut stream = stream::iter(1..=3);
88    ///
89    /// assert_eq!(stream.next().await, Some(1));
90    /// assert_eq!(stream.next().await, Some(2));
91    /// assert_eq!(stream.next().await, Some(3));
92    /// assert_eq!(stream.next().await, None);
93    /// # }
94    /// ```
95    fn next(&mut self) -> Next<'_, Self>
96    where
97        Self: Unpin,
98    {
99        Next::new(self)
100    }
101
102    /// Consumes and returns the next item in the stream. If an error is
103    /// encountered before the next item, the error is returned instead.
104    ///
105    /// Equivalent to:
106    ///
107    /// ```ignore
108    /// async fn try_next(&mut self) -> Result<Option<T>, E>;
109    /// ```
110    ///
111    /// This is similar to the [`next`](StreamExt::next) combinator,
112    /// but returns a [`Result<Option<T>, E>`](Result) rather than
113    /// an [`Option<Result<T, E>>`](Option), making for easy use
114    /// with the [`?`](std::ops::Try) operator.
115    ///
116    /// # Examples
117    ///
118    /// ```
119    /// # #[tokio::main]
120    /// # async fn main() {
121    /// use tokio::stream::{self, StreamExt};
122    ///
123    /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
124    ///
125    /// assert_eq!(stream.try_next().await, Ok(Some(1)));
126    /// assert_eq!(stream.try_next().await, Ok(Some(2)));
127    /// assert_eq!(stream.try_next().await, Err("nope"));
128    /// # }
129    /// ```
130    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
131    where
132        Self: Stream<Item = Result<T, E>> + Unpin,
133    {
134        TryNext::new(self)
135    }
136
137    /// Maps this stream's items to a different type, returning a new stream of
138    /// the resulting type.
139    ///
140    /// The provided closure is executed over all elements of this stream as
141    /// they are made available. It is executed inline with calls to
142    /// [`poll_next`](Stream::poll_next).
143    ///
144    /// Note that this function consumes the stream passed into it and returns a
145    /// wrapped version of it, similar to the existing `map` methods in the
146    /// standard library.
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// # #[tokio::main]
152    /// # async fn main() {
153    /// use tokio::stream::{self, StreamExt};
154    ///
155    /// let stream = stream::iter(1..=3);
156    /// let mut stream = stream.map(|x| x + 3);
157    ///
158    /// assert_eq!(stream.next().await, Some(4));
159    /// assert_eq!(stream.next().await, Some(5));
160    /// assert_eq!(stream.next().await, Some(6));
161    /// # }
162    /// ```
163    fn map<T, F>(self, f: F) -> Map<Self, F>
164    where
165        F: FnMut(Self::Item) -> T,
166        Self: Sized,
167    {
168        Map::new(self, f)
169    }
170
171    /// Combine two streams into one by interleaving the output of both as it
172    /// is produced.
173    ///
174    /// Values are produced from the merged stream in the order they arrive from
175    /// the two source streams. If both source streams provide values
176    /// simultaneously, the merge stream alternates between them. This provides
177    /// some level of fairness.
178    ///
179    /// The merged stream completes once **both** source streams complete. When
180    /// one source stream completes before the other, the merge stream
181    /// exclusively polls the remaining stream.
182    ///
183    /// # Examples
184    ///
185    /// ```
186    /// use tokio::stream::StreamExt;
187    /// use tokio::sync::mpsc;
188    /// use tokio::time;
189    ///
190    /// use std::time::Duration;
191    ///
192    /// # /*
193    /// #[tokio::main]
194    /// # */
195    /// # #[tokio::main(basic_scheduler)]
196    /// async fn main() {
197    /// # time::pause();
198    ///     let (mut tx1, rx1) = mpsc::channel(10);
199    ///     let (mut tx2, rx2) = mpsc::channel(10);
200    ///
201    ///     let mut rx = rx1.merge(rx2);
202    ///
203    ///     tokio::spawn(async move {
204    ///         // Send some values immediately
205    ///         tx1.send(1).await.unwrap();
206    ///         tx1.send(2).await.unwrap();
207    ///
208    ///         // Let the other task send values
209    ///         time::delay_for(Duration::from_millis(20)).await;
210    ///
211    ///         tx1.send(4).await.unwrap();
212    ///     });
213    ///
214    ///     tokio::spawn(async move {
215    ///         // Wait for the first task to send values
216    ///         time::delay_for(Duration::from_millis(5)).await;
217    ///
218    ///         tx2.send(3).await.unwrap();
219    ///
220    ///         time::delay_for(Duration::from_millis(25)).await;
221    ///
222    ///         // Send the final value
223    ///         tx2.send(5).await.unwrap();
224    ///     });
225    ///
226    ///    assert_eq!(1, rx.next().await.unwrap());
227    ///    assert_eq!(2, rx.next().await.unwrap());
228    ///    assert_eq!(3, rx.next().await.unwrap());
229    ///    assert_eq!(4, rx.next().await.unwrap());
230    ///    assert_eq!(5, rx.next().await.unwrap());
231    ///
232    ///    // The merged stream is consumed
233    ///    assert!(rx.next().await.is_none());
234    /// }
235    /// ```
236    fn merge<U>(self, other: U) -> Merge<Self, U>
237    where
238        U: Stream<Item = Self::Item>,
239        Self: Sized,
240    {
241        Merge::new(self, other)
242    }
243
244    /// Filters the values produced by this stream according to the provided
245    /// predicate.
246    ///
247    /// As values of this stream are made available, the provided predicate `f`
248    /// will be run against them. If the predicate
249    /// resolves to `true`, then the stream will yield the value, but if the
250    /// predicate resolves to `false`, then the value
251    /// will be discarded and the next value will be produced.
252    ///
253    /// Note that this function consumes the stream passed into it and returns a
254    /// wrapped version of it, similar to [`Iterator::filter`] method in the
255    /// standard library.
256    ///
257    /// # Examples
258    ///
259    /// ```
260    /// # #[tokio::main]
261    /// # async fn main() {
262    /// use tokio::stream::{self, StreamExt};
263    ///
264    /// let stream = stream::iter(1..=8);
265    /// let mut evens = stream.filter(|x| x % 2 == 0);
266    ///
267    /// assert_eq!(Some(2), evens.next().await);
268    /// assert_eq!(Some(4), evens.next().await);
269    /// assert_eq!(Some(6), evens.next().await);
270    /// assert_eq!(Some(8), evens.next().await);
271    /// assert_eq!(None, evens.next().await);
272    /// # }
273    /// ```
274    fn filter<F>(self, f: F) -> Filter<Self, F>
275    where
276        F: FnMut(&Self::Item) -> bool,
277        Self: Sized,
278    {
279        Filter::new(self, f)
280    }
281
282    /// Filters the values produced by this stream while simultaneously mapping
283    /// them to a different type according to the provided closure.
284    ///
285    /// As values of this stream are made available, the provided function will
286    /// be run on them. If the predicate `f` resolves to
287    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
288    /// it resolves to [`None`] then the next value will be produced.
289    ///
290    /// Note that this function consumes the stream passed into it and returns a
291    /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
292    /// standard library.
293    ///
294    /// # Examples
295    /// ```
296    /// # #[tokio::main]
297    /// # async fn main() {
298    /// use tokio::stream::{self, StreamExt};
299    ///
300    /// let stream = stream::iter(1..=8);
301    /// let mut evens = stream.filter_map(|x| {
302    ///     if x % 2 == 0 { Some(x + 1) } else { None }
303    /// });
304    ///
305    /// assert_eq!(Some(3), evens.next().await);
306    /// assert_eq!(Some(5), evens.next().await);
307    /// assert_eq!(Some(7), evens.next().await);
308    /// assert_eq!(Some(9), evens.next().await);
309    /// assert_eq!(None, evens.next().await);
310    /// # }
311    /// ```
312    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
313    where
314        F: FnMut(Self::Item) -> Option<T>,
315        Self: Sized,
316    {
317        FilterMap::new(self, f)
318    }
319
320    /// Creates a stream which ends after the first `None`.
321    ///
322    /// After a stream returns `None`, behavior is undefined. Future calls to
323    /// `poll_next` may or may not return `Some(T)` again or they may panic.
324    /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
325    /// return `None` forever.
326    ///
327    /// # Examples
328    ///
329    /// ```
330    /// use tokio::stream::{Stream, StreamExt};
331    ///
332    /// use std::pin::Pin;
333    /// use std::task::{Context, Poll};
334    ///
335    /// // a stream which alternates between Some and None
336    /// struct Alternate {
337    ///     state: i32,
338    /// }
339    ///
340    /// impl Stream for Alternate {
341    ///     type Item = i32;
342    ///
343    ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
344    ///         let val = self.state;
345    ///         self.state = self.state + 1;
346    ///
347    ///         // if it's even, Some(i32), else None
348    ///         if val % 2 == 0 {
349    ///             Poll::Ready(Some(val))
350    ///         } else {
351    ///             Poll::Ready(None)
352    ///         }
353    ///     }
354    /// }
355    ///
356    /// #[tokio::main]
357    /// async fn main() {
358    ///     let mut stream = Alternate { state: 0 };
359    ///
360    ///     // the stream goes back and forth
361    ///     assert_eq!(stream.next().await, Some(0));
362    ///     assert_eq!(stream.next().await, None);
363    ///     assert_eq!(stream.next().await, Some(2));
364    ///     assert_eq!(stream.next().await, None);
365    ///
366    ///     // however, once it is fused
367    ///     let mut stream = stream.fuse();
368    ///
369    ///     assert_eq!(stream.next().await, Some(4));
370    ///     assert_eq!(stream.next().await, None);
371    ///
372    ///     // it will always return `None` after the first time.
373    ///     assert_eq!(stream.next().await, None);
374    ///     assert_eq!(stream.next().await, None);
375    ///     assert_eq!(stream.next().await, None);
376    /// }
377    /// ```
378    fn fuse(self) -> Fuse<Self>
379    where
380        Self: Sized,
381    {
382        Fuse::new(self)
383    }
384
385    /// Creates a new stream of at most `n` items of the underlying stream.
386    ///
387    /// Once `n` items have been yielded from this stream then it will always
388    /// return that the stream is done.
389    ///
390    /// # Examples
391    ///
392    /// ```
393    /// # #[tokio::main]
394    /// # async fn main() {
395    /// use tokio::stream::{self, StreamExt};
396    ///
397    /// let mut stream = stream::iter(1..=10).take(3);
398    ///
399    /// assert_eq!(Some(1), stream.next().await);
400    /// assert_eq!(Some(2), stream.next().await);
401    /// assert_eq!(Some(3), stream.next().await);
402    /// assert_eq!(None, stream.next().await);
403    /// # }
404    /// ```
405    fn take(self, n: usize) -> Take<Self>
406    where
407        Self: Sized,
408    {
409        Take::new(self, n)
410    }
411
412    /// Take elements from this stream while the provided predicate
413    /// resolves to `true`.
414    ///
415    /// This function, like `Iterator::take_while`, will take elements from the
416    /// stream until the predicate `f` resolves to `false`. Once one element
417    /// returns false it will always return that the stream is done.
418    ///
419    /// # Examples
420    ///
421    /// ```
422    /// # #[tokio::main]
423    /// # async fn main() {
424    /// use tokio::stream::{self, StreamExt};
425    ///
426    /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
427    ///
428    /// assert_eq!(Some(1), stream.next().await);
429    /// assert_eq!(Some(2), stream.next().await);
430    /// assert_eq!(Some(3), stream.next().await);
431    /// assert_eq!(None, stream.next().await);
432    /// # }
433    /// ```
434    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
435    where
436        F: FnMut(&Self::Item) -> bool,
437        Self: Sized,
438    {
439        TakeWhile::new(self, f)
440    }
441
442    /// Tests if every element of the stream matches a predicate.
443    ///
444    /// `all()` takes a closure that returns `true` or `false`. It applies
445    /// this closure to each element of the stream, and if they all return
446    /// `true`, then so does `all`. If any of them return `false`, it
447    /// returns `false`. An empty stream returns `true`.
448    ///
449    /// `all()` is short-circuiting; in other words, it will stop processing
450    /// as soon as it finds a `false`, given that no matter what else happens,
451    /// the result will also be `false`.
452    ///
453    /// An empty stream returns `true`.
454    ///
455    /// # Examples
456    ///
457    /// Basic usage:
458    ///
459    /// ```
460    /// # #[tokio::main]
461    /// # async fn main() {
462    /// use tokio::stream::{self, StreamExt};
463    ///
464    /// let a = [1, 2, 3];
465    ///
466    /// assert!(stream::iter(&a).all(|&x| x > 0).await);
467    ///
468    /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
469    /// # }
470    /// ```
471    ///
472    /// Stopping at the first `false`:
473    ///
474    /// ```
475    /// # #[tokio::main]
476    /// # async fn main() {
477    /// use tokio::stream::{self, StreamExt};
478    ///
479    /// let a = [1, 2, 3];
480    ///
481    /// let mut iter = stream::iter(&a);
482    ///
483    /// assert!(!iter.all(|&x| x != 2).await);
484    ///
485    /// // we can still use `iter`, as there are more elements.
486    /// assert_eq!(iter.next().await, Some(&3));
487    /// # }
488    /// ```
489    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
490    where
491        Self: Unpin,
492        F: FnMut(Self::Item) -> bool,
493    {
494        AllFuture::new(self, f)
495    }
496
497    /// Tests if any element of the stream matches a predicate.
498    ///
499    /// `any()` takes a closure that returns `true` or `false`. It applies
500    /// this closure to each element of the stream, and if any of them return
501    /// `true`, then so does `any()`. If they all return `false`, it
502    /// returns `false`.
503    ///
504    /// `any()` is short-circuiting; in other words, it will stop processing
505    /// as soon as it finds a `true`, given that no matter what else happens,
506    /// the result will also be `true`.
507    ///
508    /// An empty stream returns `false`.
509    ///
510    /// Basic usage:
511    ///
512    /// ```
513    /// # #[tokio::main]
514    /// # async fn main() {
515    /// use tokio::stream::{self, StreamExt};
516    ///
517    /// let a = [1, 2, 3];
518    ///
519    /// assert!(stream::iter(&a).any(|&x| x > 0).await);
520    ///
521    /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
522    /// # }
523    /// ```
524    ///
525    /// Stopping at the first `true`:
526    ///
527    /// ```
528    /// # #[tokio::main]
529    /// # async fn main() {
530    /// use tokio::stream::{self, StreamExt};
531    ///
532    /// let a = [1, 2, 3];
533    ///
534    /// let mut iter = stream::iter(&a);
535    ///
536    /// assert!(iter.any(|&x| x != 2).await);
537    ///
538    /// // we can still use `iter`, as there are more elements.
539    /// assert_eq!(iter.next().await, Some(&2));
540    /// # }
541    /// ```
542    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
543    where
544        Self: Unpin,
545        F: FnMut(Self::Item) -> bool,
546    {
547        AnyFuture::new(self, f)
548    }
549
550    /// Combine two streams into one by first returning all values from the
551    /// first stream then all values from the second stream.
552    ///
553    /// As long as `self` still has values to emit, no values from `other` are
554    /// emitted, even if some are ready.
555    ///
556    /// # Examples
557    ///
558    /// ```
559    /// use tokio::stream::{self, StreamExt};
560    ///
561    /// #[tokio::main]
562    /// async fn main() {
563    ///     let one = stream::iter(vec![1, 2, 3]);
564    ///     let two = stream::iter(vec![4, 5, 6]);
565    ///
566    ///     let mut stream = one.chain(two);
567    ///
568    ///     assert_eq!(stream.next().await, Some(1));
569    ///     assert_eq!(stream.next().await, Some(2));
570    ///     assert_eq!(stream.next().await, Some(3));
571    ///     assert_eq!(stream.next().await, Some(4));
572    ///     assert_eq!(stream.next().await, Some(5));
573    ///     assert_eq!(stream.next().await, Some(6));
574    ///     assert_eq!(stream.next().await, None);
575    /// }
576    /// ```
577    fn chain<U>(self, other: U) -> Chain<Self, U>
578    where
579        U: Stream<Item = Self::Item>,
580        Self: Sized,
581    {
582        Chain::new(self, other)
583    }
584
585    /// Drain stream pushing all emitted values into a collection.
586    ///
587    /// `collect` streams all values, awaiting as needed. Values are pushed into
588    /// a collection. A number of different target collection types are
589    /// supported, including [`Vec`](std::vec::Vec),
590    /// [`String`](std::string::String), and [`Bytes`](bytes::Bytes).
591    ///
592    /// # `Result`
593    ///
594    /// `collect()` can also be used with streams of type `Result<T, E>` where
595    /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
596    /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
597    /// streaming is terminated and `collect()` returns the `Err`.
598    ///
599    /// # Notes
600    ///
601    /// `FromStream` is currently a sealed trait. Stabilization is pending
602    /// enhancements to the Rust langague.
603    ///
604    /// # Examples
605    ///
606    /// Basic usage:
607    ///
608    /// ```
609    /// use tokio::stream::{self, StreamExt};
610    ///
611    /// #[tokio::main]
612    /// async fn main() {
613    ///     let doubled: Vec<i32> =
614    ///         stream::iter(vec![1, 2, 3])
615    ///             .map(|x| x * 2)
616    ///             .collect()
617    ///             .await;
618    ///
619    ///     assert_eq!(vec![2, 4, 6], doubled);
620    /// }
621    /// ```
622    ///
623    /// Collecting a stream of `Result` values
624    ///
625    /// ```
626    /// use tokio::stream::{self, StreamExt};
627    ///
628    /// #[tokio::main]
629    /// async fn main() {
630    ///     // A stream containing only `Ok` values will be collected
631    ///     let values: Result<Vec<i32>, &str> =
632    ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
633    ///             .collect()
634    ///             .await;
635    ///
636    ///     assert_eq!(Ok(vec![1, 2, 3]), values);
637    ///
638    ///     // A stream containing `Err` values will return the first error.
639    ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
640    ///
641    ///     let values: Result<Vec<i32>, &str> =
642    ///         stream::iter(results)
643    ///             .collect()
644    ///             .await;
645    ///
646    ///     assert_eq!(Err("no"), values);
647    /// }
648    /// ```
649    fn collect<T>(self) -> Collect<Self, T>
650    where
651        T: FromStream<Self::Item>,
652        Self: Sized,
653    {
654        Collect::new(self)
655    }
656}
657
658impl<St: ?Sized> StreamExt for St where St: Stream {}