tokio_stream/
stream_ext.rs

1use core::future::Future;
2use futures_core::Stream;
3
4mod all;
5use all::AllFuture;
6
7mod any;
8use any::AnyFuture;
9
10mod chain;
11pub use chain::Chain;
12
13pub(crate) mod collect;
14use collect::{Collect, FromStream};
15
16mod filter;
17pub use filter::Filter;
18
19mod filter_map;
20pub use filter_map::FilterMap;
21
22mod fold;
23use fold::FoldFuture;
24
25mod fuse;
26pub use fuse::Fuse;
27
28mod map;
29pub use map::Map;
30
31mod map_while;
32pub use map_while::MapWhile;
33
34mod merge;
35pub use merge::Merge;
36
37mod next;
38use next::Next;
39
40mod skip;
41pub use skip::Skip;
42
43mod skip_while;
44pub use skip_while::SkipWhile;
45
46mod take;
47pub use take::Take;
48
49mod take_while;
50pub use take_while::TakeWhile;
51
52mod then;
53pub use then::Then;
54
55mod try_next;
56use try_next::TryNext;
57
58mod peekable;
59pub use peekable::Peekable;
60
61cfg_time! {
62    pub(crate) mod timeout;
63    pub(crate) mod timeout_repeating;
64    pub use timeout::Timeout;
65    pub use timeout_repeating::TimeoutRepeating;
66    use tokio::time::{Duration, Interval};
67    mod throttle;
68    use throttle::{throttle, Throttle};
69    mod chunks_timeout;
70    pub use chunks_timeout::ChunksTimeout;
71}
72
73/// An extension trait for the [`Stream`] trait that provides a variety of
74/// convenient combinator functions.
75///
76/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
77/// in the [futures] crate, however both Tokio and futures provide separate
78/// `StreamExt` utility traits, and some utilities are only available on one of
79/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
80/// trait in the futures crate.
81///
82/// If you need utilities from both `StreamExt` traits, you should prefer to
83/// import one of them, and use the other through the fully qualified call
84/// syntax. For example:
85/// ```
86/// // import one of the traits:
87/// use futures::stream::StreamExt;
88/// # #[tokio::main(flavor = "current_thread")]
89/// # async fn main() {
90///
91/// let a = tokio_stream::iter(vec![1, 3, 5]);
92/// let b = tokio_stream::iter(vec![2, 4, 6]);
93///
94/// // use the fully qualified call syntax for the other trait:
95/// let merged = tokio_stream::StreamExt::merge(a, b);
96///
97/// // use normal call notation for futures::stream::StreamExt::collect
98/// let output: Vec<_> = merged.collect().await;
99/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
100/// # }
101/// ```
102///
103/// [`Stream`]: crate::Stream
104/// [futures]: https://docs.rs/futures
105/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
106pub trait StreamExt: Stream {
107    /// Consumes and returns the next value in the stream or `None` if the
108    /// stream is finished.
109    ///
110    /// Equivalent to:
111    ///
112    /// ```ignore
113    /// async fn next(&mut self) -> Option<Self::Item>;
114    /// ```
115    ///
116    /// Note that because `next` doesn't take ownership over the stream,
117    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
118    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
119    /// be done by boxing the stream using [`Box::pin`] or
120    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
121    /// crate.
122    ///
123    /// # Cancel safety
124    ///
125    /// This method is cancel safe. The returned future only
126    /// holds onto a reference to the underlying stream,
127    /// so dropping it will never lose a value.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// # #[tokio::main]
133    /// # async fn main() {
134    /// use tokio_stream::{self as stream, StreamExt};
135    ///
136    /// let mut stream = stream::iter(1..=3);
137    ///
138    /// assert_eq!(stream.next().await, Some(1));
139    /// assert_eq!(stream.next().await, Some(2));
140    /// assert_eq!(stream.next().await, Some(3));
141    /// assert_eq!(stream.next().await, None);
142    /// # }
143    /// ```
144    fn next(&mut self) -> Next<'_, Self>
145    where
146        Self: Unpin,
147    {
148        Next::new(self)
149    }
150
151    /// Consumes and returns the next item in the stream. If an error is
152    /// encountered before the next item, the error is returned instead.
153    ///
154    /// Equivalent to:
155    ///
156    /// ```ignore
157    /// async fn try_next(&mut self) -> Result<Option<T>, E>;
158    /// ```
159    ///
160    /// This is similar to the [`next`](StreamExt::next) combinator,
161    /// but returns a [`Result<Option<T>, E>`](Result) rather than
162    /// an [`Option<Result<T, E>>`](Option), making for easy use
163    /// with the [`?`](std::ops::Try) operator.
164    ///
165    /// # Cancel safety
166    ///
167    /// This method is cancel safe. The returned future only
168    /// holds onto a reference to the underlying stream,
169    /// so dropping it will never lose a value.
170    ///
171    /// # Examples
172    ///
173    /// ```
174    /// # #[tokio::main]
175    /// # async fn main() {
176    /// use tokio_stream::{self as stream, StreamExt};
177    ///
178    /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
179    ///
180    /// assert_eq!(stream.try_next().await, Ok(Some(1)));
181    /// assert_eq!(stream.try_next().await, Ok(Some(2)));
182    /// assert_eq!(stream.try_next().await, Err("nope"));
183    /// # }
184    /// ```
185    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
186    where
187        Self: Stream<Item = Result<T, E>> + Unpin,
188    {
189        TryNext::new(self)
190    }
191
192    /// Maps this stream's items to a different type, returning a new stream of
193    /// the resulting type.
194    ///
195    /// The provided closure is executed over all elements of this stream as
196    /// they are made available. It is executed inline with calls to
197    /// [`poll_next`](Stream::poll_next).
198    ///
199    /// Note that this function consumes the stream passed into it and returns a
200    /// wrapped version of it, similar to the existing `map` methods in the
201    /// standard library.
202    ///
203    /// # Examples
204    ///
205    /// ```
206    /// # #[tokio::main]
207    /// # async fn main() {
208    /// use tokio_stream::{self as stream, StreamExt};
209    ///
210    /// let stream = stream::iter(1..=3);
211    /// let mut stream = stream.map(|x| x + 3);
212    ///
213    /// assert_eq!(stream.next().await, Some(4));
214    /// assert_eq!(stream.next().await, Some(5));
215    /// assert_eq!(stream.next().await, Some(6));
216    /// # }
217    /// ```
218    fn map<T, F>(self, f: F) -> Map<Self, F>
219    where
220        F: FnMut(Self::Item) -> T,
221        Self: Sized,
222    {
223        Map::new(self, f)
224    }
225
226    /// Map this stream's items to a different type for as long as determined by
227    /// the provided closure. A stream of the target type will be returned,
228    /// which will yield elements until the closure returns `None`.
229    ///
230    /// The provided closure is executed over all elements of this stream as
231    /// they are made available, until it returns `None`. It is executed inline
232    /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
233    /// the underlying stream will not be polled again.
234    ///
235    /// Note that this function consumes the stream passed into it and returns a
236    /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
237    /// standard library.
238    ///
239    /// # Examples
240    ///
241    /// ```
242    /// # #[tokio::main]
243    /// # async fn main() {
244    /// use tokio_stream::{self as stream, StreamExt};
245    ///
246    /// let stream = stream::iter(1..=10);
247    /// let mut stream = stream.map_while(|x| {
248    ///     if x < 4 {
249    ///         Some(x + 3)
250    ///     } else {
251    ///         None
252    ///     }
253    /// });
254    /// assert_eq!(stream.next().await, Some(4));
255    /// assert_eq!(stream.next().await, Some(5));
256    /// assert_eq!(stream.next().await, Some(6));
257    /// assert_eq!(stream.next().await, None);
258    /// # }
259    /// ```
260    fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
261    where
262        F: FnMut(Self::Item) -> Option<T>,
263        Self: Sized,
264    {
265        MapWhile::new(self, f)
266    }
267
268    /// Maps this stream's items asynchronously to a different type, returning a
269    /// new stream of the resulting type.
270    ///
271    /// The provided closure is executed over all elements of this stream as
272    /// they are made available, and the returned future is executed. Only one
273    /// future is executed at the time.
274    ///
275    /// Note that this function consumes the stream passed into it and returns a
276    /// wrapped version of it, similar to the existing `then` methods in the
277    /// standard library.
278    ///
279    /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
280    /// returned by this method. To handle this, you can use `tokio::pin!` as in
281    /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
282    ///
283    /// # Examples
284    ///
285    /// ```
286    /// # #[tokio::main]
287    /// # async fn main() {
288    /// use tokio_stream::{self as stream, StreamExt};
289    ///
290    /// async fn do_async_work(value: i32) -> i32 {
291    ///     value + 3
292    /// }
293    ///
294    /// let stream = stream::iter(1..=3);
295    /// let stream = stream.then(do_async_work);
296    ///
297    /// tokio::pin!(stream);
298    ///
299    /// assert_eq!(stream.next().await, Some(4));
300    /// assert_eq!(stream.next().await, Some(5));
301    /// assert_eq!(stream.next().await, Some(6));
302    /// # }
303    /// ```
304    fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
305    where
306        F: FnMut(Self::Item) -> Fut,
307        Fut: Future,
308        Self: Sized,
309    {
310        Then::new(self, f)
311    }
312
313    /// Combine two streams into one by interleaving the output of both as it
314    /// is produced.
315    ///
316    /// Values are produced from the merged stream in the order they arrive from
317    /// the two source streams. If both source streams provide values
318    /// simultaneously, the merge stream alternates between them. This provides
319    /// some level of fairness. You should not chain calls to `merge`, as this
320    /// will break the fairness of the merging.
321    ///
322    /// The merged stream completes once **both** source streams complete. When
323    /// one source stream completes before the other, the merge stream
324    /// exclusively polls the remaining stream.
325    ///
326    /// For merging multiple streams, consider using [`StreamMap`] instead.
327    ///
328    /// [`StreamMap`]: crate::StreamMap
329    ///
330    /// # Examples
331    ///
332    /// ```
333    /// use tokio_stream::{StreamExt, Stream};
334    /// use tokio::sync::mpsc;
335    /// use tokio::time;
336    ///
337    /// use std::time::Duration;
338    /// use std::pin::Pin;
339    ///
340    /// # /*
341    /// #[tokio::main]
342    /// # */
343    /// # #[tokio::main(flavor = "current_thread")]
344    /// async fn main() {
345    /// # time::pause();
346    ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
347    ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
348    ///
349    ///     // Convert the channels to a `Stream`.
350    ///     let rx1 = Box::pin(async_stream::stream! {
351    ///           while let Some(item) = rx1.recv().await {
352    ///               yield item;
353    ///           }
354    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
355    ///
356    ///     let rx2 = Box::pin(async_stream::stream! {
357    ///           while let Some(item) = rx2.recv().await {
358    ///               yield item;
359    ///           }
360    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
361    ///
362    ///     let mut rx = rx1.merge(rx2);
363    ///
364    ///     tokio::spawn(async move {
365    ///         // Send some values immediately
366    ///         tx1.send(1).await.unwrap();
367    ///         tx1.send(2).await.unwrap();
368    ///
369    ///         // Let the other task send values
370    ///         time::sleep(Duration::from_millis(20)).await;
371    ///
372    ///         tx1.send(4).await.unwrap();
373    ///     });
374    ///
375    ///     tokio::spawn(async move {
376    ///         // Wait for the first task to send values
377    ///         time::sleep(Duration::from_millis(5)).await;
378    ///
379    ///         tx2.send(3).await.unwrap();
380    ///
381    ///         time::sleep(Duration::from_millis(25)).await;
382    ///
383    ///         // Send the final value
384    ///         tx2.send(5).await.unwrap();
385    ///     });
386    ///
387    ///    assert_eq!(1, rx.next().await.unwrap());
388    ///    assert_eq!(2, rx.next().await.unwrap());
389    ///    assert_eq!(3, rx.next().await.unwrap());
390    ///    assert_eq!(4, rx.next().await.unwrap());
391    ///    assert_eq!(5, rx.next().await.unwrap());
392    ///
393    ///    // The merged stream is consumed
394    ///    assert!(rx.next().await.is_none());
395    /// }
396    /// ```
397    fn merge<U>(self, other: U) -> Merge<Self, U>
398    where
399        U: Stream<Item = Self::Item>,
400        Self: Sized,
401    {
402        Merge::new(self, other)
403    }
404
405    /// Filters the values produced by this stream according to the provided
406    /// predicate.
407    ///
408    /// As values of this stream are made available, the provided predicate `f`
409    /// will be run against them. If the predicate
410    /// resolves to `true`, then the stream will yield the value, but if the
411    /// predicate resolves to `false`, then the value
412    /// will be discarded and the next value will be produced.
413    ///
414    /// Note that this function consumes the stream passed into it and returns a
415    /// wrapped version of it, similar to [`Iterator::filter`] method in the
416    /// standard library.
417    ///
418    /// # Examples
419    ///
420    /// ```
421    /// # #[tokio::main]
422    /// # async fn main() {
423    /// use tokio_stream::{self as stream, StreamExt};
424    ///
425    /// let stream = stream::iter(1..=8);
426    /// let mut evens = stream.filter(|x| x % 2 == 0);
427    ///
428    /// assert_eq!(Some(2), evens.next().await);
429    /// assert_eq!(Some(4), evens.next().await);
430    /// assert_eq!(Some(6), evens.next().await);
431    /// assert_eq!(Some(8), evens.next().await);
432    /// assert_eq!(None, evens.next().await);
433    /// # }
434    /// ```
435    fn filter<F>(self, f: F) -> Filter<Self, F>
436    where
437        F: FnMut(&Self::Item) -> bool,
438        Self: Sized,
439    {
440        Filter::new(self, f)
441    }
442
443    /// Filters the values produced by this stream while simultaneously mapping
444    /// them to a different type according to the provided closure.
445    ///
446    /// As values of this stream are made available, the provided function will
447    /// be run on them. If the predicate `f` resolves to
448    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
449    /// it resolves to [`None`], then the value will be skipped.
450    ///
451    /// Note that this function consumes the stream passed into it and returns a
452    /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
453    /// standard library.
454    ///
455    /// # Examples
456    /// ```
457    /// # #[tokio::main]
458    /// # async fn main() {
459    /// use tokio_stream::{self as stream, StreamExt};
460    ///
461    /// let stream = stream::iter(1..=8);
462    /// let mut evens = stream.filter_map(|x| {
463    ///     if x % 2 == 0 { Some(x + 1) } else { None }
464    /// });
465    ///
466    /// assert_eq!(Some(3), evens.next().await);
467    /// assert_eq!(Some(5), evens.next().await);
468    /// assert_eq!(Some(7), evens.next().await);
469    /// assert_eq!(Some(9), evens.next().await);
470    /// assert_eq!(None, evens.next().await);
471    /// # }
472    /// ```
473    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
474    where
475        F: FnMut(Self::Item) -> Option<T>,
476        Self: Sized,
477    {
478        FilterMap::new(self, f)
479    }
480
481    /// Creates a stream which ends after the first `None`.
482    ///
483    /// After a stream returns `None`, behavior is undefined. Future calls to
484    /// `poll_next` may or may not return `Some(T)` again or they may panic.
485    /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
486    /// return `None` forever.
487    ///
488    /// # Examples
489    ///
490    /// ```
491    /// use tokio_stream::{Stream, StreamExt};
492    ///
493    /// use std::pin::Pin;
494    /// use std::task::{Context, Poll};
495    ///
496    /// // a stream which alternates between Some and None
497    /// struct Alternate {
498    ///     state: i32,
499    /// }
500    ///
501    /// impl Stream for Alternate {
502    ///     type Item = i32;
503    ///
504    ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
505    ///         let val = self.state;
506    ///         self.state = self.state + 1;
507    ///
508    ///         // if it's even, Some(i32), else None
509    ///         if val % 2 == 0 {
510    ///             Poll::Ready(Some(val))
511    ///         } else {
512    ///             Poll::Ready(None)
513    ///         }
514    ///     }
515    /// }
516    ///
517    /// #[tokio::main]
518    /// async fn main() {
519    ///     let mut stream = Alternate { state: 0 };
520    ///
521    ///     // the stream goes back and forth
522    ///     assert_eq!(stream.next().await, Some(0));
523    ///     assert_eq!(stream.next().await, None);
524    ///     assert_eq!(stream.next().await, Some(2));
525    ///     assert_eq!(stream.next().await, None);
526    ///
527    ///     // however, once it is fused
528    ///     let mut stream = stream.fuse();
529    ///
530    ///     assert_eq!(stream.next().await, Some(4));
531    ///     assert_eq!(stream.next().await, None);
532    ///
533    ///     // it will always return `None` after the first time.
534    ///     assert_eq!(stream.next().await, None);
535    ///     assert_eq!(stream.next().await, None);
536    ///     assert_eq!(stream.next().await, None);
537    /// }
538    /// ```
539    fn fuse(self) -> Fuse<Self>
540    where
541        Self: Sized,
542    {
543        Fuse::new(self)
544    }
545
546    /// Creates a new stream of at most `n` items of the underlying stream.
547    ///
548    /// Once `n` items have been yielded from this stream then it will always
549    /// return that the stream is done.
550    ///
551    /// # Examples
552    ///
553    /// ```
554    /// # #[tokio::main]
555    /// # async fn main() {
556    /// use tokio_stream::{self as stream, StreamExt};
557    ///
558    /// let mut stream = stream::iter(1..=10).take(3);
559    ///
560    /// assert_eq!(Some(1), stream.next().await);
561    /// assert_eq!(Some(2), stream.next().await);
562    /// assert_eq!(Some(3), stream.next().await);
563    /// assert_eq!(None, stream.next().await);
564    /// # }
565    /// ```
566    fn take(self, n: usize) -> Take<Self>
567    where
568        Self: Sized,
569    {
570        Take::new(self, n)
571    }
572
573    /// Take elements from this stream while the provided predicate
574    /// resolves to `true`.
575    ///
576    /// This function, like `Iterator::take_while`, will take elements from the
577    /// stream until the predicate `f` resolves to `false`. Once one element
578    /// returns false it will always return that the stream is done.
579    ///
580    /// # Examples
581    ///
582    /// ```
583    /// # #[tokio::main]
584    /// # async fn main() {
585    /// use tokio_stream::{self as stream, StreamExt};
586    ///
587    /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
588    ///
589    /// assert_eq!(Some(1), stream.next().await);
590    /// assert_eq!(Some(2), stream.next().await);
591    /// assert_eq!(Some(3), stream.next().await);
592    /// assert_eq!(None, stream.next().await);
593    /// # }
594    /// ```
595    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
596    where
597        F: FnMut(&Self::Item) -> bool,
598        Self: Sized,
599    {
600        TakeWhile::new(self, f)
601    }
602
603    /// Creates a new stream that will skip the `n` first items of the
604    /// underlying stream.
605    ///
606    /// # Examples
607    ///
608    /// ```
609    /// # #[tokio::main]
610    /// # async fn main() {
611    /// use tokio_stream::{self as stream, StreamExt};
612    ///
613    /// let mut stream = stream::iter(1..=10).skip(7);
614    ///
615    /// assert_eq!(Some(8), stream.next().await);
616    /// assert_eq!(Some(9), stream.next().await);
617    /// assert_eq!(Some(10), stream.next().await);
618    /// assert_eq!(None, stream.next().await);
619    /// # }
620    /// ```
621    fn skip(self, n: usize) -> Skip<Self>
622    where
623        Self: Sized,
624    {
625        Skip::new(self, n)
626    }
627
628    /// Skip elements from the underlying stream while the provided predicate
629    /// resolves to `true`.
630    ///
631    /// This function, like [`Iterator::skip_while`], will ignore elements from the
632    /// stream until the predicate `f` resolves to `false`. Once one element
633    /// returns false, the rest of the elements will be yielded.
634    ///
635    /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
636    ///
637    /// # Examples
638    ///
639    /// ```
640    /// # #[tokio::main]
641    /// # async fn main() {
642    /// use tokio_stream::{self as stream, StreamExt};
643    /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
644    ///
645    /// assert_eq!(Some(3), stream.next().await);
646    /// assert_eq!(Some(4), stream.next().await);
647    /// assert_eq!(Some(1), stream.next().await);
648    /// assert_eq!(None, stream.next().await);
649    /// # }
650    /// ```
651    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
652    where
653        F: FnMut(&Self::Item) -> bool,
654        Self: Sized,
655    {
656        SkipWhile::new(self, f)
657    }
658
659    /// Tests if every element of the stream matches a predicate.
660    ///
661    /// Equivalent to:
662    ///
663    /// ```ignore
664    /// async fn all<F>(&mut self, f: F) -> bool;
665    /// ```
666    ///
667    /// `all()` takes a closure that returns `true` or `false`. It applies
668    /// this closure to each element of the stream, and if they all return
669    /// `true`, then so does `all`. If any of them return `false`, it
670    /// returns `false`. An empty stream returns `true`.
671    ///
672    /// `all()` is short-circuiting; in other words, it will stop processing
673    /// as soon as it finds a `false`, given that no matter what else happens,
674    /// the result will also be `false`.
675    ///
676    /// An empty stream returns `true`.
677    ///
678    /// # Examples
679    ///
680    /// Basic usage:
681    ///
682    /// ```
683    /// # #[tokio::main]
684    /// # async fn main() {
685    /// use tokio_stream::{self as stream, StreamExt};
686    ///
687    /// let a = [1, 2, 3];
688    ///
689    /// assert!(stream::iter(&a).all(|&x| x > 0).await);
690    ///
691    /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
692    /// # }
693    /// ```
694    ///
695    /// Stopping at the first `false`:
696    ///
697    /// ```
698    /// # #[tokio::main]
699    /// # async fn main() {
700    /// use tokio_stream::{self as stream, StreamExt};
701    ///
702    /// let a = [1, 2, 3];
703    ///
704    /// let mut iter = stream::iter(&a);
705    ///
706    /// assert!(!iter.all(|&x| x != 2).await);
707    ///
708    /// // we can still use `iter`, as there are more elements.
709    /// assert_eq!(iter.next().await, Some(&3));
710    /// # }
711    /// ```
712    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
713    where
714        Self: Unpin,
715        F: FnMut(Self::Item) -> bool,
716    {
717        AllFuture::new(self, f)
718    }
719
720    /// Tests if any element of the stream matches a predicate.
721    ///
722    /// Equivalent to:
723    ///
724    /// ```ignore
725    /// async fn any<F>(&mut self, f: F) -> bool;
726    /// ```
727    ///
728    /// `any()` takes a closure that returns `true` or `false`. It applies
729    /// this closure to each element of the stream, and if any of them return
730    /// `true`, then so does `any()`. If they all return `false`, it
731    /// returns `false`.
732    ///
733    /// `any()` is short-circuiting; in other words, it will stop processing
734    /// as soon as it finds a `true`, given that no matter what else happens,
735    /// the result will also be `true`.
736    ///
737    /// An empty stream returns `false`.
738    ///
739    /// Basic usage:
740    ///
741    /// ```
742    /// # #[tokio::main]
743    /// # async fn main() {
744    /// use tokio_stream::{self as stream, StreamExt};
745    ///
746    /// let a = [1, 2, 3];
747    ///
748    /// assert!(stream::iter(&a).any(|&x| x > 0).await);
749    ///
750    /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
751    /// # }
752    /// ```
753    ///
754    /// Stopping at the first `true`:
755    ///
756    /// ```
757    /// # #[tokio::main]
758    /// # async fn main() {
759    /// use tokio_stream::{self as stream, StreamExt};
760    ///
761    /// let a = [1, 2, 3];
762    ///
763    /// let mut iter = stream::iter(&a);
764    ///
765    /// assert!(iter.any(|&x| x != 2).await);
766    ///
767    /// // we can still use `iter`, as there are more elements.
768    /// assert_eq!(iter.next().await, Some(&2));
769    /// # }
770    /// ```
771    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
772    where
773        Self: Unpin,
774        F: FnMut(Self::Item) -> bool,
775    {
776        AnyFuture::new(self, f)
777    }
778
779    /// Combine two streams into one by first returning all values from the
780    /// first stream then all values from the second stream.
781    ///
782    /// As long as `self` still has values to emit, no values from `other` are
783    /// emitted, even if some are ready.
784    ///
785    /// # Examples
786    ///
787    /// ```
788    /// use tokio_stream::{self as stream, StreamExt};
789    ///
790    /// #[tokio::main]
791    /// async fn main() {
792    ///     let one = stream::iter(vec![1, 2, 3]);
793    ///     let two = stream::iter(vec![4, 5, 6]);
794    ///
795    ///     let mut stream = one.chain(two);
796    ///
797    ///     assert_eq!(stream.next().await, Some(1));
798    ///     assert_eq!(stream.next().await, Some(2));
799    ///     assert_eq!(stream.next().await, Some(3));
800    ///     assert_eq!(stream.next().await, Some(4));
801    ///     assert_eq!(stream.next().await, Some(5));
802    ///     assert_eq!(stream.next().await, Some(6));
803    ///     assert_eq!(stream.next().await, None);
804    /// }
805    /// ```
806    fn chain<U>(self, other: U) -> Chain<Self, U>
807    where
808        U: Stream<Item = Self::Item>,
809        Self: Sized,
810    {
811        Chain::new(self, other)
812    }
813
814    /// A combinator that applies a function to every element in a stream
815    /// producing a single, final value.
816    ///
817    /// Equivalent to:
818    ///
819    /// ```ignore
820    /// async fn fold<B, F>(self, init: B, f: F) -> B;
821    /// ```
822    ///
823    /// # Examples
824    /// Basic usage:
825    /// ```
826    /// # #[tokio::main]
827    /// # async fn main() {
828    /// use tokio_stream::{self as stream, *};
829    ///
830    /// let s = stream::iter(vec![1u8, 2, 3]);
831    /// let sum = s.fold(0, |acc, x| acc + x).await;
832    ///
833    /// assert_eq!(sum, 6);
834    /// # }
835    /// ```
836    fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
837    where
838        Self: Sized,
839        F: FnMut(B, Self::Item) -> B,
840    {
841        FoldFuture::new(self, init, f)
842    }
843
844    /// Drain stream pushing all emitted values into a collection.
845    ///
846    /// Equivalent to:
847    ///
848    /// ```ignore
849    /// async fn collect<T>(self) -> T;
850    /// ```
851    ///
852    /// `collect` streams all values, awaiting as needed. Values are pushed into
853    /// a collection. A number of different target collection types are
854    /// supported, including [`Vec`], [`String`], and [`Bytes`].
855    ///
856    /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
857    ///
858    /// # `Result`
859    ///
860    /// `collect()` can also be used with streams of type `Result<T, E>` where
861    /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
862    /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
863    /// streaming is terminated and `collect()` returns the `Err`.
864    ///
865    /// # Notes
866    ///
867    /// `FromStream` is currently a sealed trait. Stabilization is pending
868    /// enhancements to the Rust language.
869    ///
870    /// # Examples
871    ///
872    /// Basic usage:
873    ///
874    /// ```
875    /// use tokio_stream::{self as stream, StreamExt};
876    ///
877    /// #[tokio::main]
878    /// async fn main() {
879    ///     let doubled: Vec<i32> =
880    ///         stream::iter(vec![1, 2, 3])
881    ///             .map(|x| x * 2)
882    ///             .collect()
883    ///             .await;
884    ///
885    ///     assert_eq!(vec![2, 4, 6], doubled);
886    /// }
887    /// ```
888    ///
889    /// Collecting a stream of `Result` values
890    ///
891    /// ```
892    /// use tokio_stream::{self as stream, StreamExt};
893    ///
894    /// #[tokio::main]
895    /// async fn main() {
896    ///     // A stream containing only `Ok` values will be collected
897    ///     let values: Result<Vec<i32>, &str> =
898    ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
899    ///             .collect()
900    ///             .await;
901    ///
902    ///     assert_eq!(Ok(vec![1, 2, 3]), values);
903    ///
904    ///     // A stream containing `Err` values will return the first error.
905    ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
906    ///
907    ///     let values: Result<Vec<i32>, &str> =
908    ///         stream::iter(results)
909    ///             .collect()
910    ///             .await;
911    ///
912    ///     assert_eq!(Err("no"), values);
913    /// }
914    /// ```
915    fn collect<T>(self) -> Collect<Self, T>
916    where
917        T: FromStream<Self::Item>,
918        Self: Sized,
919    {
920        Collect::new(self)
921    }
922
923    /// Applies a per-item timeout to the passed stream.
924    ///
925    /// `timeout()` takes a `Duration` that represents the maximum amount of
926    /// time each element of the stream has to complete before timing out.
927    ///
928    /// If the wrapped stream yields a value before the deadline is reached, the
929    /// value is returned. Otherwise, an error is returned. The caller may decide
930    /// to continue consuming the stream and will eventually get the next source
931    /// stream value once it becomes available. See
932    /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
933    /// where the timeouts will repeat.
934    ///
935    /// # Notes
936    ///
937    /// This function consumes the stream passed into it and returns a
938    /// wrapped version of it.
939    ///
940    /// Polling the returned stream will continue to poll the inner stream even
941    /// if one or more items time out.
942    ///
943    /// # Examples
944    ///
945    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
946    ///
947    /// ```
948    /// # #[tokio::main]
949    /// # async fn main() {
950    /// use tokio_stream::{self as stream, StreamExt};
951    /// use std::time::Duration;
952    /// # let int_stream = stream::iter(1..=3);
953    ///
954    /// let int_stream = int_stream.timeout(Duration::from_secs(1));
955    /// tokio::pin!(int_stream);
956    ///
957    /// // When no items time out, we get the 3 elements in succession:
958    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
959    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
960    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
961    /// assert_eq!(int_stream.try_next().await, Ok(None));
962    ///
963    /// // If the second item times out, we get an error and continue polling the stream:
964    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
965    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
966    /// assert!(int_stream.try_next().await.is_err());
967    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
968    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
969    /// assert_eq!(int_stream.try_next().await, Ok(None));
970    ///
971    /// // If we want to stop consuming the source stream the first time an
972    /// // element times out, we can use the `take_while` operator:
973    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
974    /// let mut int_stream = int_stream.take_while(Result::is_ok);
975    ///
976    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
977    /// assert_eq!(int_stream.try_next().await, Ok(None));
978    /// # }
979    /// ```
980    ///
981    /// Once a timeout error is received, no further events will be received
982    /// unless the wrapped stream yields a value (timeouts do not repeat).
983    ///
984    /// ```
985    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
986    /// # async fn main() {
987    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
988    /// use std::time::Duration;
989    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
990    /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
991    /// tokio::pin!(timeout_stream);
992    ///
993    /// // Only one timeout will be received between values in the source stream.
994    /// assert!(timeout_stream.try_next().await.is_ok());
995    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
996    /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
997    /// # }
998    /// ```
999    #[cfg(feature = "time")]
1000    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1001    fn timeout(self, duration: Duration) -> Timeout<Self>
1002    where
1003        Self: Sized,
1004    {
1005        Timeout::new(self, duration)
1006    }
1007
1008    /// Applies a per-item timeout to the passed stream.
1009    ///
1010    /// `timeout_repeating()` takes an [`Interval`] that controls the time each
1011    /// element of the stream has to complete before timing out.
1012    ///
1013    /// If the wrapped stream yields a value before the deadline is reached, the
1014    /// value is returned. Otherwise, an error is returned. The caller may decide
1015    /// to continue consuming the stream and will eventually get the next source
1016    /// stream value once it becomes available. Unlike `timeout()`, if no value
1017    /// becomes available before the deadline is reached, additional errors are
1018    /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1019    /// for an alternative where the timeouts do not repeat.
1020    ///
1021    /// # Notes
1022    ///
1023    /// This function consumes the stream passed into it and returns a
1024    /// wrapped version of it.
1025    ///
1026    /// Polling the returned stream will continue to poll the inner stream even
1027    /// if one or more items time out.
1028    ///
1029    /// # Examples
1030    ///
1031    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1032    ///
1033    /// ```
1034    /// # #[tokio::main]
1035    /// # async fn main() {
1036    /// use tokio_stream::{self as stream, StreamExt};
1037    /// use std::time::Duration;
1038    /// # let int_stream = stream::iter(1..=3);
1039    ///
1040    /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1041    /// tokio::pin!(int_stream);
1042    ///
1043    /// // When no items time out, we get the 3 elements in succession:
1044    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1045    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1046    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1047    /// assert_eq!(int_stream.try_next().await, Ok(None));
1048    ///
1049    /// // If the second item times out, we get an error and continue polling the stream:
1050    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1051    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1052    /// assert!(int_stream.try_next().await.is_err());
1053    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1054    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1055    /// assert_eq!(int_stream.try_next().await, Ok(None));
1056    ///
1057    /// // If we want to stop consuming the source stream the first time an
1058    /// // element times out, we can use the `take_while` operator:
1059    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1060    /// let mut int_stream = int_stream.take_while(Result::is_ok);
1061    ///
1062    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1063    /// assert_eq!(int_stream.try_next().await, Ok(None));
1064    /// # }
1065    /// ```
1066    ///
1067    /// Timeout errors will be continuously produced at the specified interval
1068    /// until the wrapped stream yields a value.
1069    ///
1070    /// ```
1071    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1072    /// # async fn main() {
1073    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1074    /// use std::time::Duration;
1075    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1076    /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1077    /// tokio::pin!(timeout_stream);
1078    ///
1079    /// // Multiple timeouts will be received between values in the source stream.
1080    /// assert!(timeout_stream.try_next().await.is_ok());
1081    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1082    /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1083    /// // Will eventually receive another value from the source stream...
1084    /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1085    /// # }
1086    /// ```
1087    #[cfg(feature = "time")]
1088    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1089    fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1090    where
1091        Self: Sized,
1092    {
1093        TimeoutRepeating::new(self, interval)
1094    }
1095
1096    /// Slows down a stream by enforcing a delay between items.
1097    ///
1098    /// The underlying timer behind this utility has a granularity of one millisecond.
1099    ///
1100    /// # Example
1101    ///
1102    /// Create a throttled stream.
1103    /// ```rust,no_run
1104    /// use std::time::Duration;
1105    /// use tokio_stream::StreamExt;
1106    ///
1107    /// # async fn dox() {
1108    /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1109    /// tokio::pin!(item_stream);
1110    ///
1111    /// loop {
1112    ///     // The string will be produced at most every 2 seconds
1113    ///     println!("{:?}", item_stream.next().await);
1114    /// }
1115    /// # }
1116    /// ```
1117    #[cfg(feature = "time")]
1118    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1119    fn throttle(self, duration: Duration) -> Throttle<Self>
1120    where
1121        Self: Sized,
1122    {
1123        throttle(duration, self)
1124    }
1125
1126    /// Batches the items in the given stream using a maximum duration and size for each batch.
1127    ///
1128    /// This stream returns the next batch of items in the following situations:
1129    ///  1. The inner stream has returned at least `max_size` many items since the last batch.
1130    ///  2. The time since the first item of a batch is greater than the given duration.
1131    ///  3. The end of the stream is reached.
1132    ///
1133    /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1134    /// will not be emitted if no items are received upstream.
1135    ///
1136    /// # Panics
1137    ///
1138    /// This function panics if `max_size` is zero
1139    ///
1140    /// # Example
1141    ///
1142    /// ```rust
1143    /// use std::time::Duration;
1144    /// use tokio::time;
1145    /// use tokio_stream::{self as stream, StreamExt};
1146    /// use futures::FutureExt;
1147    ///
1148    /// #[tokio::main]
1149    /// # async fn _unused() {}
1150    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1151    /// async fn main() {
1152    ///     let iter = vec![1, 2, 3, 4].into_iter();
1153    ///     let stream0 = stream::iter(iter);
1154    ///
1155    ///     let iter = vec![5].into_iter();
1156    ///     let stream1 = stream::iter(iter)
1157    ///          .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1158    ///
1159    ///     let chunk_stream = stream0
1160    ///         .chain(stream1)
1161    ///         .chunks_timeout(3, Duration::from_secs(2));
1162    ///     tokio::pin!(chunk_stream);
1163    ///
1164    ///     // a full batch was received
1165    ///     assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1166    ///     // deadline was reached before max_size was reached
1167    ///     assert_eq!(chunk_stream.next().await, Some(vec![4]));
1168    ///     // last element in the stream
1169    ///     assert_eq!(chunk_stream.next().await, Some(vec![5]));
1170    /// }
1171    /// ```
1172    #[cfg(feature = "time")]
1173    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1174    #[track_caller]
1175    fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1176    where
1177        Self: Sized,
1178    {
1179        assert!(max_size > 0, "`max_size` must be non-zero.");
1180        ChunksTimeout::new(self, max_size, duration)
1181    }
1182
1183    /// Turns the stream into a peekable stream, whose next element can be peeked at without being
1184    /// consumed.
1185    /// ```rust
1186    /// use tokio_stream::{self as stream, StreamExt};
1187    ///
1188    /// #[tokio::main]
1189    /// # async fn _unused() {}
1190    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1191    /// async fn main() {
1192    ///     let iter = vec![1, 2, 3, 4].into_iter();
1193    ///     let mut stream = stream::iter(iter).peekable();
1194    ///
1195    ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1196    ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1197    ///     assert_eq!(stream.next().await.unwrap(), 1);
1198    ///     assert_eq!(*stream.peek().await.unwrap(), 2);
1199    /// }
1200    /// ```
1201    fn peekable(self) -> Peekable<Self>
1202    where
1203        Self: Sized,
1204    {
1205        Peekable::new(self)
1206    }
1207}
1208
1209impl<St: ?Sized> StreamExt for St where St: Stream {}
1210
1211/// Merge the size hints from two streams.
1212fn merge_size_hints(
1213    (left_low, left_high): (usize, Option<usize>),
1214    (right_low, right_high): (usize, Option<usize>),
1215) -> (usize, Option<usize>) {
1216    let low = left_low.saturating_add(right_low);
1217    let high = match (left_high, right_high) {
1218        (Some(h1), Some(h2)) => h1.checked_add(h2),
1219        _ => None,
1220    };
1221    (low, high)
1222}