async_std/stream/stream/
mod.rs

1//! Asynchronous iteration.
2//!
3//! This module is an async version of [`std::iter`].
4//!
5//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
6//!
7//! # Examples
8//!
9//! ```
10//! # async_std::task::block_on(async {
11//! #
12//! use async_std::prelude::*;
13//! use async_std::stream;
14//!
15//! let mut s = stream::repeat(9).take(3);
16//!
17//! while let Some(v) = s.next().await {
18//!     assert_eq!(v, 9);
19//! }
20//! #
21//! # })
22//! ```
23
24mod all;
25mod any;
26mod chain;
27mod cloned;
28mod cmp;
29mod copied;
30mod cycle;
31mod enumerate;
32mod eq;
33mod filter;
34mod filter_map;
35mod find;
36mod find_map;
37mod fold;
38mod for_each;
39mod fuse;
40mod ge;
41mod gt;
42mod inspect;
43mod last;
44mod le;
45mod lt;
46mod map;
47mod max;
48mod max_by;
49mod max_by_key;
50mod min;
51mod min_by;
52mod min_by_key;
53mod ne;
54mod next;
55mod nth;
56mod partial_cmp;
57mod position;
58mod scan;
59mod skip;
60mod skip_while;
61mod step_by;
62mod take;
63mod take_while;
64mod try_fold;
65mod try_for_each;
66mod zip;
67
68use all::AllFuture;
69use any::AnyFuture;
70use cmp::CmpFuture;
71use cycle::Cycle;
72use enumerate::Enumerate;
73use eq::EqFuture;
74use filter_map::FilterMap;
75use find::FindFuture;
76use find_map::FindMapFuture;
77use fold::FoldFuture;
78use for_each::ForEachFuture;
79use ge::GeFuture;
80use gt::GtFuture;
81use last::LastFuture;
82use le::LeFuture;
83use lt::LtFuture;
84use max::MaxFuture;
85use max_by::MaxByFuture;
86use max_by_key::MaxByKeyFuture;
87use min::MinFuture;
88use min_by::MinByFuture;
89use min_by_key::MinByKeyFuture;
90use ne::NeFuture;
91use next::NextFuture;
92use nth::NthFuture;
93use partial_cmp::PartialCmpFuture;
94use position::PositionFuture;
95use try_fold::TryFoldFuture;
96use try_for_each::TryForEachFuture;
97
98pub use chain::Chain;
99pub use cloned::Cloned;
100pub use copied::Copied;
101pub use filter::Filter;
102pub use fuse::Fuse;
103pub use inspect::Inspect;
104pub use map::Map;
105pub use scan::Scan;
106pub use skip::Skip;
107pub use skip_while::SkipWhile;
108pub use step_by::StepBy;
109pub use take::Take;
110pub use take_while::TakeWhile;
111pub use zip::Zip;
112
113use core::cmp::Ordering;
114
115cfg_unstable! {
116    use core::future::Future;
117    use core::pin::Pin;
118    use core::time::Duration;
119
120    use crate::stream::into_stream::IntoStream;
121    use crate::stream::{FromStream, Product, Sum};
122    use crate::stream::Extend;
123
124    use count::CountFuture;
125    use partition::PartitionFuture;
126    use unzip::UnzipFuture;
127
128    pub use merge::Merge;
129    pub use flatten::Flatten;
130    pub use flat_map::FlatMap;
131    pub use timeout::{TimeoutError, Timeout};
132    pub use throttle::Throttle;
133    pub use delay::Delay;
134
135    mod count;
136    mod merge;
137    mod flatten;
138    mod flat_map;
139    mod partition;
140    mod timeout;
141    mod throttle;
142    mod delay;
143    mod unzip;
144}
145
146pub use futures_core::stream::Stream as Stream;
147
148#[doc = r#"
149    Extension methods for [`Stream`].
150
151    [`Stream`]: ../stream/trait.Stream.html
152"#]
153pub trait StreamExt: Stream {
154    #[doc = r#"
155        Advances the stream and returns the next value.
156
157        Returns [`None`] when iteration is finished. Individual stream implementations may
158        choose to resume iteration, and so calling `next()` again may or may not eventually
159        start returning more values.
160
161        [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
162
163        # Examples
164
165        ```
166        # fn main() { async_std::task::block_on(async {
167        #
168        use async_std::prelude::*;
169        use async_std::stream;
170
171        let mut s = stream::once(7);
172
173        assert_eq!(s.next().await, Some(7));
174        assert_eq!(s.next().await, None);
175        #
176        # }) }
177        ```
178    "#]
179    fn next(&mut self) -> NextFuture<'_, Self>
180    where
181        Self: Unpin,
182    {
183        NextFuture { stream: self }
184    }
185
186    #[doc = r#"
187        Creates a stream that yields its first `n` elements.
188
189        # Examples
190
191        ```
192        # fn main() { async_std::task::block_on(async {
193        #
194        use async_std::prelude::*;
195        use async_std::stream;
196
197        let mut s = stream::repeat(9).take(3);
198
199        while let Some(v) = s.next().await {
200            assert_eq!(v, 9);
201        }
202        #
203        # }) }
204        ```
205    "#]
206    fn take(self, n: usize) -> Take<Self>
207    where
208        Self: Sized,
209    {
210        Take::new(self, n)
211    }
212
213    #[doc = r#"
214        Creates a stream that yields elements based on a predicate.
215
216        # Examples
217
218        ```
219        # fn main() { async_std::task::block_on(async {
220        #
221        use async_std::prelude::*;
222        use async_std::stream;
223
224        let s = stream::from_iter(vec![1, 2, 3, 4]);
225        let mut s = s.take_while(|x| x < &3 );
226
227        assert_eq!(s.next().await, Some(1));
228        assert_eq!(s.next().await, Some(2));
229        assert_eq!(s.next().await, None);
230        #
231        # }) }
232        ```
233    "#]
234    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
235    where
236        Self: Sized,
237        P: FnMut(&Self::Item) -> bool,
238    {
239        TakeWhile::new(self, predicate)
240    }
241
242    #[doc = r#"
243        Limit the amount of items yielded per timeslice in a stream.
244
245        This stream does not drop any items, but will only limit the rate at which items pass through.
246        # Examples
247        ```
248        # fn main() { async_std::task::block_on(async {
249        #
250        use async_std::prelude::*;
251        use async_std::stream;
252        use std::time::{Duration, Instant};
253
254        let start = Instant::now();
255
256        // emit value every 5 milliseconds
257        let s = stream::interval(Duration::from_millis(5)).take(2);
258
259        // throttle for 10 milliseconds
260        let mut s = s.throttle(Duration::from_millis(10));
261
262        s.next().await;
263        assert!(start.elapsed().as_millis() >= 5);
264
265        s.next().await;
266        assert!(start.elapsed().as_millis() >= 15);
267
268        s.next().await;
269        assert!(start.elapsed().as_millis() >= 25);
270        #
271        # }) }
272        ```
273    "#]
274    #[cfg(feature = "unstable")]
275    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
276    fn throttle(self, d: Duration) -> Throttle<Self>
277    where
278        Self: Sized,
279    {
280        Throttle::new(self, d)
281    }
282
283    #[doc = r#"
284        Creates a stream that yields each `step`th element.
285
286        # Panics
287
288        This method will panic if the given step is `0`.
289
290        # Examples
291
292        Basic usage:
293
294        ```
295        # fn main() { async_std::task::block_on(async {
296        #
297        use async_std::prelude::*;
298        use async_std::stream;
299
300        let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
301        let mut stepped = s.step_by(2);
302
303        assert_eq!(stepped.next().await, Some(0));
304        assert_eq!(stepped.next().await, Some(2));
305        assert_eq!(stepped.next().await, Some(4));
306        assert_eq!(stepped.next().await, None);
307
308        #
309        # }) }
310        ```
311    "#]
312    fn step_by(self, step: usize) -> StepBy<Self>
313    where
314        Self: Sized,
315    {
316        StepBy::new(self, step)
317    }
318
319    #[doc = r#"
320        Takes two streams and creates a new stream over both in sequence.
321
322        # Examples
323
324        Basic usage:
325
326        ```
327        # fn main() { async_std::task::block_on(async {
328        #
329        use async_std::prelude::*;
330        use async_std::stream;
331
332        let first = stream::from_iter(vec![0u8, 1]);
333        let second = stream::from_iter(vec![2, 3]);
334        let mut c = first.chain(second);
335
336        assert_eq!(c.next().await, Some(0));
337        assert_eq!(c.next().await, Some(1));
338        assert_eq!(c.next().await, Some(2));
339        assert_eq!(c.next().await, Some(3));
340        assert_eq!(c.next().await, None);
341
342        #
343        # }) }
344        ```
345    "#]
346    fn chain<U>(self, other: U) -> Chain<Self, U>
347    where
348        Self: Sized,
349        U: Stream<Item = Self::Item> + Sized,
350    {
351        Chain::new(self, other)
352    }
353
354        #[doc = r#"
355        Creates an stream which copies all of its elements.
356
357        # Examples
358
359        Basic usage:
360
361        ```
362        # fn main() { async_std::task::block_on(async {
363        #
364        use async_std::prelude::*;
365        use async_std::stream;
366
367        let v = stream::from_iter(vec![&1, &2, &3]);
368
369        let mut v_cloned = v.cloned();
370
371        assert_eq!(v_cloned.next().await, Some(1));
372        assert_eq!(v_cloned.next().await, Some(2));
373        assert_eq!(v_cloned.next().await, Some(3));
374        assert_eq!(v_cloned.next().await, None);
375
376        #
377        # }) }
378        ```
379    "#]
380    fn cloned<'a, T>(self) -> Cloned<Self>
381    where
382        Self: Sized + Stream<Item = &'a T>,
383        T: Clone + 'a,
384    {
385        Cloned::new(self)
386    }
387
388
389    #[doc = r#"
390        Creates an stream which copies all of its elements.
391
392        # Examples
393
394        Basic usage:
395
396        ```
397        # fn main() { async_std::task::block_on(async {
398        #
399        use async_std::prelude::*;
400        use async_std::stream;
401
402        let s = stream::from_iter(vec![&1, &2, &3]);
403        let mut s_copied  = s.copied();
404
405        assert_eq!(s_copied.next().await, Some(1));
406        assert_eq!(s_copied.next().await, Some(2));
407        assert_eq!(s_copied.next().await, Some(3));
408        assert_eq!(s_copied.next().await, None);
409        #
410        # }) }
411        ```
412    "#]
413    fn copied<'a, T>(self) -> Copied<Self>
414    where
415        Self: Sized + Stream<Item = &'a T>,
416        T: Copy + 'a,
417    {
418        Copied::new(self)
419    }
420
421    #[doc = r#"
422        Creates a stream that yields the provided values infinitely and in order.
423
424        # Examples
425
426        Basic usage:
427
428        ```
429        # async_std::task::block_on(async {
430        #
431        use async_std::prelude::*;
432        use async_std::stream;
433
434        let mut s = stream::once(7).cycle();
435
436        assert_eq!(s.next().await, Some(7));
437        assert_eq!(s.next().await, Some(7));
438        assert_eq!(s.next().await, Some(7));
439        assert_eq!(s.next().await, Some(7));
440        assert_eq!(s.next().await, Some(7));
441        #
442        # })
443        ```
444    "#]
445    fn cycle(self) -> Cycle<Self>
446    where
447        Self: Clone + Sized,
448    {
449        Cycle::new(self)
450    }
451
452    #[doc = r#"
453        Creates a stream that gives the current element's count as well as the next value.
454
455        # Overflow behaviour.
456
457        This combinator does no guarding against overflows.
458
459        # Examples
460
461        ```
462        # fn main() { async_std::task::block_on(async {
463        #
464        use async_std::prelude::*;
465        use async_std::stream;
466
467        let s = stream::from_iter(vec!['a', 'b', 'c']);
468        let mut s = s.enumerate();
469
470        assert_eq!(s.next().await, Some((0, 'a')));
471        assert_eq!(s.next().await, Some((1, 'b')));
472        assert_eq!(s.next().await, Some((2, 'c')));
473        assert_eq!(s.next().await, None);
474        #
475        # }) }
476        ```
477    "#]
478    fn enumerate(self) -> Enumerate<Self>
479    where
480        Self: Sized,
481    {
482        Enumerate::new(self)
483    }
484
485    #[doc = r#"
486        Creates a stream that is delayed before it starts yielding items.
487
488        # Examples
489
490        ```
491        # fn main() { async_std::task::block_on(async {
492        #
493        use async_std::prelude::*;
494        use async_std::stream;
495        use std::time::{Duration, Instant};
496
497        let start = Instant::now();
498        let mut s = stream::from_iter(vec![0u8, 1, 2]).delay(Duration::from_millis(200));
499
500        assert_eq!(s.next().await, Some(0));
501        // The first time will take more than 200ms due to delay.
502        assert!(start.elapsed().as_millis() >= 200);
503
504        assert_eq!(s.next().await, Some(1));
505        // There will be no delay after the first time.
506        assert!(start.elapsed().as_millis() < 400);
507
508        assert_eq!(s.next().await, Some(2));
509        assert!(start.elapsed().as_millis() < 400);
510
511        assert_eq!(s.next().await, None);
512        assert!(start.elapsed().as_millis() < 400);
513        #
514        # }) }
515        ```
516    "#]
517    #[cfg(any(feature = "unstable", feature = "docs"))]
518    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
519    fn delay(self, dur: std::time::Duration) -> Delay<Self>
520    where
521        Self: Sized,
522    {
523        Delay::new(self, dur)
524    }
525
526    #[doc = r#"
527        Takes a closure and creates a stream that calls that closure on every element of this stream.
528
529        # Examples
530
531        ```
532        # fn main() { async_std::task::block_on(async {
533        #
534        use async_std::prelude::*;
535        use async_std::stream;
536
537        let s = stream::from_iter(vec![1, 2, 3]);
538        let mut s = s.map(|x| 2 * x);
539
540        assert_eq!(s.next().await, Some(2));
541        assert_eq!(s.next().await, Some(4));
542        assert_eq!(s.next().await, Some(6));
543        assert_eq!(s.next().await, None);
544
545        #
546        # }) }
547        ```
548    "#]
549    fn map<B, F>(self, f: F) -> Map<Self, F>
550    where
551        Self: Sized,
552        F: FnMut(Self::Item) -> B,
553    {
554        Map::new(self, f)
555    }
556
557    #[doc = r#"
558        A combinator that does something with each element in the stream, passing the value
559        on.
560
561        # Examples
562
563        Basic usage:
564
565        ```
566        # fn main() { async_std::task::block_on(async {
567        #
568        use async_std::prelude::*;
569        use async_std::stream;
570
571        let s = stream::from_iter(vec![1, 2, 3, 4, 5]);
572
573        let sum = s
574           .inspect(|x| println!("about to filter {}", x))
575           .filter(|x| x % 2 == 0)
576           .inspect(|x| println!("made it through filter: {}", x))
577           .fold(0, |sum, i| sum + i)
578           .await;
579
580        assert_eq!(sum, 6);
581        #
582        # }) }
583        ```
584    "#]
585    fn inspect<F>(self, f: F) -> Inspect<Self, F>
586    where
587        Self: Sized,
588        F: FnMut(&Self::Item),
589    {
590        Inspect::new(self, f)
591    }
592
593    #[doc = r#"
594        Returns the last element of the stream.
595
596        # Examples
597
598        Basic usage:
599
600        ```
601        # fn main() { async_std::task::block_on(async {
602        #
603        use async_std::prelude::*;
604        use async_std::stream;
605
606        let s = stream::from_iter(vec![1, 2, 3]);
607
608        let last  = s.last().await;
609        assert_eq!(last, Some(3));
610        #
611        # }) }
612        ```
613
614        An empty stream will return `None`:
615        ```
616        # fn main() { async_std::task::block_on(async {
617        #
618        use async_std::stream;
619        use crate::async_std::prelude::*;
620
621        let s = stream::empty::<()>();
622
623        let last  = s.last().await;
624        assert_eq!(last, None);
625        #
626        # }) }
627        ```
628    "#]
629    fn last(
630        self,
631    ) -> LastFuture<Self, Self::Item>
632    where
633        Self: Sized,
634    {
635        LastFuture::new(self)
636    }
637
638    #[doc = r#"
639        Creates a stream which ends after the first `None`.
640
641        After a stream returns `None`, future calls may or may not yield `Some(T)` again.
642        `fuse()` adapts an iterator, ensuring that after a `None` is given, it will always
643        return `None` forever.
644
645        # Examples
646
647        ```
648        # fn main() { async_std::task::block_on(async {
649        #
650        use async_std::prelude::*;
651        use async_std::stream;
652
653        let mut s = stream::once(1).fuse();
654        assert_eq!(s.next().await, Some(1));
655        assert_eq!(s.next().await, None);
656        assert_eq!(s.next().await, None);
657        #
658        # }) }
659        ```
660    "#]
661    fn fuse(self) -> Fuse<Self>
662    where
663        Self: Sized,
664    {
665        Fuse::new(self)
666    }
667
668    #[doc = r#"
669        Creates a stream that uses a predicate to determine if an element should be yielded.
670
671        # Examples
672
673        Basic usage:
674
675        ```
676        # fn main() { async_std::task::block_on(async {
677        #
678        use async_std::prelude::*;
679        use async_std::stream;
680
681        let s = stream::from_iter(vec![1, 2, 3, 4]);
682        let mut s = s.filter(|i| i % 2 == 0);
683
684        assert_eq!(s.next().await, Some(2));
685        assert_eq!(s.next().await, Some(4));
686        assert_eq!(s.next().await, None);
687        #
688        # }) }
689        ```
690    "#]
691    fn filter<P>(self, predicate: P) -> Filter<Self, P>
692    where
693        Self: Sized,
694        P: FnMut(&Self::Item) -> bool,
695    {
696        Filter::new(self, predicate)
697    }
698
699    #[doc= r#"
700        Creates an stream that works like map, but flattens nested structure.
701
702        # Examples
703
704        Basic usage:
705
706        ```
707        # async_std::task::block_on(async {
708
709        use async_std::prelude::*;
710        use async_std::stream;
711
712        let words = stream::from_iter(&["alpha", "beta", "gamma"]);
713
714        let merged: String = words
715            .flat_map(|s| stream::from_iter(s.chars()))
716            .collect().await;
717            assert_eq!(merged, "alphabetagamma");
718
719        let d3 = stream::from_iter(&[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]);
720        let d1: Vec<_> = d3
721            .flat_map(|item| stream::from_iter(item))
722            .flat_map(|item| stream::from_iter(item))
723            .collect().await;
724
725        assert_eq!(d1, [&1, &2, &3, &4, &5, &6, &7, &8]);
726        # });
727        ```
728    "#]
729    #[cfg(feature = "unstable")]
730    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
731    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
732        where
733            Self: Sized,
734            U: IntoStream,
735            F: FnMut(Self::Item) -> U,
736    {
737        FlatMap::new(self, f)
738    }
739
740    #[doc = r#"
741        Creates an stream that flattens nested structure.
742
743        # Examples
744
745        Basic usage:
746
747        ```
748        # async_std::task::block_on(async {
749
750        use async_std::prelude::*;
751        use async_std::stream;
752
753        let inner1 = stream::from_iter(vec![1u8,2,3]);
754        let inner2 = stream::from_iter(vec![4u8,5,6]);
755        let s  = stream::from_iter(vec![inner1, inner2]);
756
757        let v: Vec<_> = s.flatten().collect().await;
758
759        assert_eq!(v, vec![1,2,3,4,5,6]);
760
761        # });
762    "#]
763    #[cfg(feature = "unstable")]
764    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
765    fn flatten(self) -> Flatten<Self>
766    where
767        Self: Sized,
768        Self::Item: IntoStream,
769    {
770        Flatten::new(self)
771    }
772
773    #[doc = r#"
774        Both filters and maps a stream.
775
776        # Examples
777
778        Basic usage:
779
780        ```
781        # fn main() { async_std::task::block_on(async {
782        #
783
784        use async_std::prelude::*;
785        use async_std::stream;
786
787        let s = stream::from_iter(vec!["1", "lol", "3", "NaN", "5"]);
788
789        let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());
790
791        let one = parsed.next().await;
792        assert_eq!(one, Some(1));
793
794        let three = parsed.next().await;
795        assert_eq!(three, Some(3));
796
797        let five = parsed.next().await;
798        assert_eq!(five, Some(5));
799
800        let end = parsed.next().await;
801        assert_eq!(end, None);
802        #
803        # }) }
804        ```
805    "#]
806    fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
807    where
808        Self: Sized,
809        F: FnMut(Self::Item) -> Option<B>,
810    {
811        FilterMap::new(self, f)
812    }
813
814    #[doc = r#"
815        Returns the element that gives the minimum value with respect to the
816        specified key function. If several elements are equally minimum,
817        the first element is returned. If the stream is empty, `None` is returned.
818
819        # Examples
820
821        ```
822        # fn main() { async_std::task::block_on(async {
823        #
824        use async_std::prelude::*;
825        use async_std::stream;
826
827        let s = stream::from_iter(vec![-1isize, 2, -3]);
828
829        let min = s.clone().min_by_key(|x| x.abs()).await;
830        assert_eq!(min, Some(-1));
831
832        let min = stream::empty::<isize>().min_by_key(|x| x.abs()).await;
833        assert_eq!(min, None);
834        #
835        # }) }
836        ```
837    "#]
838    fn min_by_key<B, F>(
839        self,
840        key_by: F,
841    ) -> MinByKeyFuture<Self, Self::Item, F>
842    where
843        Self: Sized,
844        B: Ord,
845        F: FnMut(&Self::Item) -> B,
846    {
847        MinByKeyFuture::new(self, key_by)
848    }
849
850    #[doc = r#"
851        Returns the element that gives the maximum value with respect to the
852        specified key function. If several elements are equally maximum,
853        the first element is returned. If the stream is empty, `None` is returned.
854
855        # Examples
856
857        ```
858        # fn main() { async_std::task::block_on(async {
859        #
860        use async_std::prelude::*;
861        use async_std::stream;
862
863        let s = stream::from_iter(vec![-3_i32, 0, 1, 5, -10]);
864
865        let max = s.clone().max_by_key(|x| x.abs()).await;
866        assert_eq!(max, Some(-10));
867
868        let max = stream::empty::<isize>().max_by_key(|x| x.abs()).await;
869        assert_eq!(max, None);
870        #
871        # }) }
872        ```
873    "#]
874    fn max_by_key<B, F>(
875        self,
876        key_by: F,
877    ) -> MaxByKeyFuture<Self, Self::Item, F>
878    where
879        Self: Sized,
880        B: Ord,
881        F: FnMut(&Self::Item) -> B,
882    {
883        MaxByKeyFuture::new(self, key_by)
884    }
885
886    #[doc = r#"
887        Returns the element that gives the minimum value with respect to the
888        specified comparison function. If several elements are equally minimum,
889        the first element is returned. If the stream is empty, `None` is returned.
890
891        # Examples
892
893        ```
894        # fn main() { async_std::task::block_on(async {
895        #
896        use async_std::prelude::*;
897        use async_std::stream;
898
899        let s = stream::from_iter(vec![1u8, 2, 3]);
900
901        let min = s.clone().min_by(|x, y| x.cmp(y)).await;
902        assert_eq!(min, Some(1));
903
904        let min = s.min_by(|x, y| y.cmp(x)).await;
905        assert_eq!(min, Some(3));
906
907        let min = stream::empty::<u8>().min_by(|x, y| x.cmp(y)).await;
908        assert_eq!(min, None);
909        #
910        # }) }
911        ```
912    "#]
913    fn min_by<F>(
914        self,
915        compare: F,
916    ) -> MinByFuture<Self, F, Self::Item>
917    where
918        Self: Sized,
919        F: FnMut(&Self::Item, &Self::Item) -> Ordering,
920    {
921        MinByFuture::new(self, compare)
922    }
923
924    #[doc = r#"
925        Returns the element that gives the maximum value. If several elements are equally maximum,
926        the first element is returned. If the stream is empty, `None` is returned.
927
928        # Examples
929
930        ```
931        # fn main() { async_std::task::block_on(async {
932        #
933        use async_std::prelude::*;
934        use async_std::stream;
935
936        let s = stream::from_iter(vec![1usize, 2, 3]);
937
938        let max = s.clone().max().await;
939        assert_eq!(max, Some(3));
940
941        let max = stream::empty::<usize>().max().await;
942        assert_eq!(max, None);
943        #
944        # }) }
945        ```
946    "#]
947    fn max(
948        self,
949    ) -> MaxFuture<Self, Self::Item>
950    where
951        Self: Sized,
952        Self::Item: Ord,
953    {
954        MaxFuture::new(self)
955    }
956
957            #[doc = r#"
958        Returns the element that gives the minimum value. If several elements are equally minimum,
959        the first element is returned. If the stream is empty, `None` is returned.
960
961        # Examples
962
963        ```
964        # fn main() { async_std::task::block_on(async {
965        #
966        use async_std::prelude::*;
967        use async_std::stream;
968
969        let s = stream::from_iter(vec![1usize, 2, 3]);
970
971        let min = s.clone().min().await;
972        assert_eq!(min, Some(1));
973
974        let min = stream::empty::<usize>().min().await;
975        assert_eq!(min, None);
976        #
977        # }) }
978        ```
979    "#]
980    fn min(
981        self,
982    ) -> MinFuture<Self, Self::Item>
983    where
984        Self: Sized,
985        Self::Item: Ord,
986    {
987        MinFuture::new(self)
988    }
989
990     #[doc = r#"
991        Returns the element that gives the maximum value with respect to the
992        specified comparison function. If several elements are equally maximum,
993        the first element is returned. If the stream is empty, `None` is returned.
994
995        # Examples
996
997        ```
998        # fn main() { async_std::task::block_on(async {
999        #
1000        use async_std::prelude::*;
1001        use async_std::stream;
1002
1003        let s = stream::from_iter(vec![1u8, 2, 3]);
1004
1005        let max = s.clone().max_by(|x, y| x.cmp(y)).await;
1006        assert_eq!(max, Some(3));
1007
1008        let max = s.max_by(|x, y| y.cmp(x)).await;
1009        assert_eq!(max, Some(1));
1010
1011        let max = stream::empty::<usize>().max_by(|x, y| x.cmp(y)).await;
1012        assert_eq!(max, None);
1013        #
1014        # }) }
1015        ```
1016    "#]
1017    fn max_by<F>(
1018        self,
1019        compare: F,
1020    ) -> MaxByFuture<Self, F, Self::Item>
1021    where
1022        Self: Sized,
1023        F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1024    {
1025        MaxByFuture::new(self, compare)
1026    }
1027
1028    #[doc = r#"
1029        Returns the nth element of the stream.
1030
1031        # Examples
1032
1033        Basic usage:
1034
1035        ```
1036        # fn main() { async_std::task::block_on(async {
1037        #
1038        use async_std::prelude::*;
1039        use async_std::stream;
1040
1041        let mut s = stream::from_iter(vec![1u8, 2, 3]);
1042
1043        let second = s.nth(1).await;
1044        assert_eq!(second, Some(2));
1045        #
1046        # }) }
1047        ```
1048        Calling `nth()` multiple times:
1049
1050        ```
1051        # fn main() { async_std::task::block_on(async {
1052        #
1053        use async_std::stream;
1054        use async_std::prelude::*;
1055
1056        let mut s = stream::from_iter(vec![1u8, 2, 3]);
1057
1058        let second = s.nth(0).await;
1059        assert_eq!(second, Some(1));
1060
1061        let second = s.nth(0).await;
1062        assert_eq!(second, Some(2));
1063        #
1064        # }) }
1065        ```
1066        Returning `None` if the stream finished before returning `n` elements:
1067        ```
1068        # fn main() { async_std::task::block_on(async {
1069        #
1070        use async_std::prelude::*;
1071        use async_std::stream;
1072
1073        let mut s  = stream::from_iter(vec![1u8, 2, 3]);
1074
1075        let fourth = s.nth(4).await;
1076        assert_eq!(fourth, None);
1077        #
1078        # }) }
1079        ```
1080    "#]
1081    fn nth(
1082        &mut self,
1083        n: usize,
1084    ) -> NthFuture<'_, Self>
1085    where
1086        Self: Unpin + Sized,
1087    {
1088        NthFuture::new(self, n)
1089    }
1090
1091    #[doc = r#"
1092        Tests if every element of the stream matches a predicate.
1093
1094        `all()` takes a closure that returns `true` or `false`. It applies
1095        this closure to each element of the stream, and if they all return
1096        `true`, then so does `all()`. If any of them return `false`, it
1097        returns `false`.
1098
1099        `all()` is short-circuiting; in other words, it will stop processing
1100        as soon as it finds a `false`, given that no matter what else happens,
1101        the result will also be `false`.
1102
1103        An empty stream returns `true`.
1104
1105        # Examples
1106
1107        Basic usage:
1108
1109        ```
1110        # fn main() { async_std::task::block_on(async {
1111        #
1112        use async_std::prelude::*;
1113        use async_std::stream;
1114
1115        let mut s = stream::repeat::<u32>(42).take(3);
1116        assert!(s.all(|x| x ==  42).await);
1117
1118        #
1119        # }) }
1120        ```
1121
1122        Empty stream:
1123
1124        ```
1125        # fn main() { async_std::task::block_on(async {
1126        #
1127        use async_std::prelude::*;
1128        use async_std::stream;
1129
1130        let mut s = stream::empty::<u32>();
1131        assert!(s.all(|_| false).await);
1132        #
1133        # }) }
1134        ```
1135    "#]
1136    #[inline]
1137    fn all<F>(
1138        &mut self,
1139        f: F,
1140    ) -> AllFuture<'_, Self, F, Self::Item>
1141    where
1142        Self: Unpin + Sized,
1143        F: FnMut(Self::Item) -> bool,
1144    {
1145        AllFuture::new(self, f)
1146    }
1147
1148    #[doc = r#"
1149        Searches for an element in a stream that satisfies a predicate.
1150
1151        # Examples
1152
1153        Basic usage:
1154
1155        ```
1156        # fn main() { async_std::task::block_on(async {
1157        #
1158        use async_std::prelude::*;
1159        use async_std::stream;
1160
1161        let mut s = stream::from_iter(vec![1u8, 2, 3]);
1162        let res = s.find(|x| *x == 2).await;
1163        assert_eq!(res, Some(2));
1164        #
1165        # }) }
1166        ```
1167
1168        Resuming after a first find:
1169
1170        ```
1171        # fn main() { async_std::task::block_on(async {
1172        #
1173        use async_std::prelude::*;
1174        use async_std::stream;
1175
1176        let mut s= stream::from_iter(vec![1, 2, 3]);
1177        let res = s.find(|x| *x == 2).await;
1178        assert_eq!(res, Some(2));
1179
1180        let next = s.next().await;
1181        assert_eq!(next, Some(3));
1182        #
1183        # }) }
1184        ```
1185    "#]
1186    fn find<P>(
1187        &mut self,
1188        p: P,
1189    ) -> FindFuture<'_, Self, P>
1190    where
1191        Self: Unpin + Sized,
1192        P: FnMut(&Self::Item) -> bool,
1193    {
1194        FindFuture::new(self, p)
1195    }
1196
1197    #[doc = r#"
1198        Applies function to the elements of stream and returns the first non-none result.
1199
1200        ```
1201        # fn main() { async_std::task::block_on(async {
1202        #
1203        use async_std::prelude::*;
1204        use async_std::stream;
1205
1206        let mut s = stream::from_iter(vec!["lol", "NaN", "2", "5"]);
1207        let first_number = s.find_map(|s| s.parse().ok()).await;
1208
1209        assert_eq!(first_number, Some(2));
1210        #
1211        # }) }
1212        ```
1213    "#]
1214    fn find_map<F, B>(
1215        &mut self,
1216        f: F,
1217    ) -> FindMapFuture<'_, Self, F>
1218    where
1219        Self: Unpin + Sized,
1220        F: FnMut(Self::Item) -> Option<B>,
1221    {
1222        FindMapFuture::new(self, f)
1223    }
1224
1225    #[doc = r#"
1226        A combinator that applies a function to every element in a stream
1227        producing a single, final value.
1228
1229        # Examples
1230
1231        Basic usage:
1232
1233        ```
1234        # fn main() { async_std::task::block_on(async {
1235        #
1236        use async_std::prelude::*;
1237        use async_std::stream;
1238
1239        let s = stream::from_iter(vec![1u8, 2, 3]);
1240        let sum = s.fold(0, |acc, x| acc + x).await;
1241
1242        assert_eq!(sum, 6);
1243        #
1244        # }) }
1245        ```
1246    "#]
1247    fn fold<B, F>(
1248        self,
1249        init: B,
1250        f: F,
1251    ) -> FoldFuture<Self, F, B>
1252    where
1253        Self: Sized,
1254        F: FnMut(B, Self::Item) -> B,
1255    {
1256        FoldFuture::new(self, init, f)
1257    }
1258
1259    #[doc = r#"
1260        A combinator that applies a function to every element in a stream
1261        creating two collections from it.
1262
1263        # Examples
1264
1265        Basic usage:
1266
1267        ```
1268        # fn main() { async_std::task::block_on(async {
1269        #
1270        use async_std::prelude::*;
1271        use async_std::stream;
1272
1273        let (even, odd): (Vec<i32>, Vec<i32>) = stream::from_iter(vec![1, 2, 3])
1274            .partition(|&n| n % 2 == 0).await;
1275
1276        assert_eq!(even, vec![2]);
1277        assert_eq!(odd, vec![1, 3]);
1278
1279        #
1280        # }) }
1281        ```
1282    "#]
1283    #[cfg(feature = "unstable")]
1284    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1285    fn partition<B, F>(
1286        self,
1287        f: F,
1288    ) -> PartitionFuture<Self, F, B>
1289    where
1290        Self: Sized,
1291        F: FnMut(&Self::Item) -> bool,
1292        B: Default + Extend<Self::Item>,
1293    {
1294        PartitionFuture::new(self, f)
1295    }
1296
1297    #[doc = r#"
1298        Call a closure on each element of the stream.
1299
1300        # Examples
1301
1302        ```
1303        # fn main() { async_std::task::block_on(async {
1304        #
1305        use async_std::prelude::*;
1306        use async_std::stream;
1307        use std::sync::mpsc::channel;
1308
1309        let (tx, rx) = channel();
1310
1311        let s = stream::from_iter(vec![1usize, 2, 3]);
1312        let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;
1313
1314        let v: Vec<_> = rx.iter().collect();
1315
1316        assert_eq!(v, vec![1, 2, 3]);
1317        #
1318        # }) }
1319        ```
1320    "#]
1321    fn for_each<F>(
1322        self,
1323        f: F,
1324    ) -> ForEachFuture<Self, F>
1325    where
1326        Self: Sized,
1327        F: FnMut(Self::Item),
1328    {
1329        ForEachFuture::new(self, f)
1330    }
1331
1332    #[doc = r#"
1333        Tests if any element of the stream matches a predicate.
1334
1335        `any()` takes a closure that returns `true` or `false`. It applies
1336        this closure to each element of the stream, and if any of them return
1337        `true`, then so does `any()`. If they all return `false`, it
1338        returns `false`.
1339
1340        `any()` is short-circuiting; in other words, it will stop processing
1341        as soon as it finds a `true`, given that no matter what else happens,
1342        the result will also be `true`.
1343
1344        An empty stream returns `false`.
1345
1346        # Examples
1347
1348        Basic usage:
1349
1350        ```
1351        # fn main() { async_std::task::block_on(async {
1352        #
1353        use async_std::prelude::*;
1354        use async_std::stream;
1355
1356        let mut s = stream::repeat::<u32>(42).take(3);
1357        assert!(s.any(|x| x ==  42).await);
1358        #
1359        # }) }
1360        ```
1361
1362        Empty stream:
1363
1364        ```
1365        # fn main() { async_std::task::block_on(async {
1366        #
1367        use async_std::prelude::*;
1368        use async_std::stream;
1369
1370        let mut s = stream::empty::<u32>();
1371        assert!(!s.any(|_| false).await);
1372        #
1373        # }) }
1374        ```
1375    "#]
1376    #[inline]
1377    fn any<F>(
1378        &mut self,
1379        f: F,
1380    ) -> AnyFuture<'_, Self, F, Self::Item>
1381    where
1382        Self: Unpin + Sized,
1383        F: FnMut(Self::Item) -> bool,
1384    {
1385        AnyFuture::new(self, f)
1386    }
1387
1388    #[doc = r#"
1389        Borrows an stream, rather than consuming it.
1390
1391        This is useful to allow applying stream adaptors while still retaining ownership of the original stream.
1392
1393        # Examples
1394
1395        ```
1396        # fn main() { async_std::task::block_on(async {
1397        #
1398        use async_std::prelude::*;
1399        use async_std::stream;
1400
1401        let a = vec![1isize, 2, 3];
1402
1403        let stream = stream::from_iter(a);
1404
1405        let sum: isize = stream.take(5).sum().await;
1406
1407        assert_eq!(sum, 6);
1408
1409        // if we try to use stream again, it won't work. The following line
1410        // gives error: use of moved value: `stream`
1411        // assert_eq!(stream.next(), None);
1412
1413        // let's try that again
1414        let a = vec![1isize, 2, 3];
1415
1416        let mut stream = stream::from_iter(a);
1417
1418        // instead, we add in a .by_ref()
1419        let sum: isize = stream.by_ref().take(2).sum().await;
1420
1421        assert_eq!(sum, 3);
1422
1423        // now this is just fine:
1424        assert_eq!(stream.next().await, Some(3));
1425        assert_eq!(stream.next().await, None);
1426        #
1427        # }) }
1428        ```
1429    "#]
1430    #[cfg(feature = "unstable")]
1431    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1432    fn by_ref(&mut self) -> &mut Self {
1433        self
1434    }
1435
1436    #[doc = r#"
1437        A stream adaptor similar to [`fold`] that holds internal state and produces a new
1438        stream.
1439
1440        [`fold`]: #method.fold
1441
1442        `scan()` takes two arguments: an initial value which seeds the internal state, and
1443        a closure with two arguments, the first being a mutable reference to the internal
1444        state and the second a stream element. The closure can assign to the internal state
1445        to share state between iterations.
1446
1447        On iteration, the closure will be applied to each element of the stream and the
1448        return value from the closure, an `Option`, is yielded by the stream.
1449
1450        ## Examples
1451
1452        ```
1453        # fn main() { async_std::task::block_on(async {
1454        #
1455        use async_std::prelude::*;
1456        use async_std::stream;
1457
1458        let s = stream::from_iter(vec![1isize, 2, 3]);
1459        let mut s = s.scan(1, |state, x| {
1460            *state = *state * x;
1461            Some(-*state)
1462        });
1463
1464        assert_eq!(s.next().await, Some(-1));
1465        assert_eq!(s.next().await, Some(-2));
1466        assert_eq!(s.next().await, Some(-6));
1467        assert_eq!(s.next().await, None);
1468        #
1469        # }) }
1470        ```
1471    "#]
1472    #[inline]
1473    fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1474    where
1475        Self: Sized,
1476        F: FnMut(&mut St, Self::Item) -> Option<B>,
1477    {
1478        Scan::new(self, initial_state, f)
1479    }
1480
1481    #[doc = r#"
1482        Combinator that `skip`s elements based on a predicate.
1483
1484        Takes a closure argument. It will call this closure on every element in
1485        the stream and ignore elements until it returns `false`.
1486
1487        After `false` is returned, `SkipWhile`'s job is over and all further
1488        elements in the stream are yielded.
1489
1490        ## Examples
1491
1492        ```
1493        # fn main() { async_std::task::block_on(async {
1494        #
1495        use async_std::prelude::*;
1496        use async_std::stream;
1497
1498        let a = stream::from_iter(vec![-1i32, 0, 1]);
1499        let mut s = a.skip_while(|x| x.is_negative());
1500
1501        assert_eq!(s.next().await, Some(0));
1502        assert_eq!(s.next().await, Some(1));
1503        assert_eq!(s.next().await, None);
1504        #
1505        # }) }
1506        ```
1507    "#]
1508    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1509    where
1510        Self: Sized,
1511        P: FnMut(&Self::Item) -> bool,
1512    {
1513        SkipWhile::new(self, predicate)
1514    }
1515
1516    #[doc = r#"
1517        Creates a combinator that skips the first `n` elements.
1518
1519        ## Examples
1520
1521        ```
1522        # fn main() { async_std::task::block_on(async {
1523        #
1524        use async_std::prelude::*;
1525        use async_std::stream;
1526
1527        let s = stream::from_iter(vec![1u8, 2, 3]);
1528        let mut skipped = s.skip(2);
1529
1530        assert_eq!(skipped.next().await, Some(3));
1531        assert_eq!(skipped.next().await, None);
1532        #
1533        # }) }
1534        ```
1535    "#]
1536    fn skip(self, n: usize) -> Skip<Self>
1537    where
1538        Self: Sized,
1539    {
1540        Skip::new(self, n)
1541    }
1542
1543    #[doc=r#"
1544        Await a stream or times out after a duration of time.
1545
1546        If you want to await an I/O future consider using
1547        [`io::timeout`](../io/fn.timeout.html) instead.
1548
1549        # Examples
1550
1551        ```
1552        # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
1553        #
1554        use std::time::Duration;
1555
1556        use async_std::stream;
1557        use async_std::prelude::*;
1558
1559        let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1));
1560
1561        while let Some(v) = s.next().await {
1562            assert_eq!(v, Ok(1));
1563        }
1564
1565        // when timeout
1566        let mut s = stream::pending::<()>().timeout(Duration::from_millis(10));
1567        match s.next().await {
1568            Some(item) => assert!(item.is_err()),
1569            None => panic!()
1570        };
1571        #
1572        # Ok(()) }) }
1573        ```
1574    "#]
1575    #[cfg(any(feature = "unstable", feature = "docs"))]
1576    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1577    fn timeout(self, dur: Duration) -> Timeout<Self>
1578    where
1579        Self: Stream + Sized,
1580    {
1581        Timeout::new(self, dur)
1582    }
1583
1584    #[doc = r#"
1585        A combinator that applies a function as long as it returns successfully, producing a single, final value.
1586        Immediately returns the error when the function returns unsuccessfully.
1587
1588        # Examples
1589
1590        Basic usage:
1591
1592        ```
1593        # fn main() { async_std::task::block_on(async {
1594        #
1595        use async_std::prelude::*;
1596        use async_std::stream;
1597
1598        let mut s = stream::from_iter(vec![1usize, 2, 3]);
1599        let sum = s.try_fold(0, |acc, v| {
1600            if (acc+v) % 2 == 1 {
1601                Ok(v+3)
1602            } else {
1603                Err("fail")
1604            }
1605        }).await;
1606
1607        assert_eq!(sum, Err("fail"));
1608        #
1609        # }) }
1610        ```
1611    "#]
1612    fn try_fold<B, F, T, E>(
1613        &mut self,
1614        init: T,
1615        f: F,
1616    ) -> TryFoldFuture<'_, Self, F, T>
1617    where
1618        Self: Unpin + Sized,
1619        F: FnMut(B, Self::Item) -> Result<T, E>,
1620    {
1621        TryFoldFuture::new(self, init, f)
1622    }
1623
1624    #[doc = r#"
1625        Applies a falliable function to each element in a stream, stopping at first error and returning it.
1626
1627        # Examples
1628
1629        ```
1630        # fn main() { async_std::task::block_on(async {
1631        #
1632        use std::sync::mpsc::channel;
1633        use async_std::prelude::*;
1634        use async_std::stream;
1635
1636        let (tx, rx) = channel();
1637
1638        let mut s = stream::from_iter(vec![1u8, 2, 3]);
1639        let s = s.try_for_each(|v| {
1640            if v % 2 == 1 {
1641                tx.clone().send(v).unwrap();
1642                Ok(())
1643            } else {
1644                Err("even")
1645            }
1646        });
1647
1648        let res = s.await;
1649        drop(tx);
1650        let values: Vec<_> = rx.iter().collect();
1651
1652        assert_eq!(values, vec![1]);
1653        assert_eq!(res, Err("even"));
1654        #
1655        # }) }
1656        ```
1657    "#]
1658    fn try_for_each<F, E>(
1659        &mut self,
1660        f: F,
1661    ) -> TryForEachFuture<'_, Self, F>
1662    where
1663        Self: Unpin + Sized,
1664        F: FnMut(Self::Item) -> Result<(), E>,
1665    {
1666        TryForEachFuture::new(self, f)
1667    }
1668
1669    #[doc = r#"
1670        'Zips up' two streams into a single stream of pairs.
1671
1672        `zip()` returns a new stream that will iterate over two other streams, returning a
1673        tuple where the first element comes from the first stream, and the second element
1674        comes from the second stream.
1675
1676        In other words, it zips two streams together, into a single one.
1677
1678        If either stream returns [`None`], [`poll_next`] from the zipped stream will return
1679        [`None`]. If the first stream returns [`None`], `zip` will short-circuit and
1680        `poll_next` will not be called on the second stream.
1681
1682        [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1683        [`poll_next`]: #tymethod.poll_next
1684
1685        ## Examples
1686
1687        ```
1688        # fn main() { async_std::task::block_on(async {
1689        #
1690        use async_std::prelude::*;
1691        use async_std::stream;
1692
1693        let l = stream::from_iter(vec![1u8, 2, 3]);
1694        let r = stream::from_iter(vec![4u8, 5, 6, 7]);
1695        let mut s = l.zip(r);
1696
1697        assert_eq!(s.next().await, Some((1, 4)));
1698        assert_eq!(s.next().await, Some((2, 5)));
1699        assert_eq!(s.next().await, Some((3, 6)));
1700        assert_eq!(s.next().await, None);
1701        #
1702        # }) }
1703        ```
1704    "#]
1705    #[inline]
1706    fn zip<U>(self, other: U) -> Zip<Self, U>
1707    where
1708        Self: Sized,
1709        U: Stream,
1710    {
1711        Zip::new(self, other)
1712    }
1713
1714    #[doc = r#"
1715        Converts an stream of pairs into a pair of containers.
1716
1717        `unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements.
1718
1719        This function is, in some sense, the opposite of [`zip`].
1720
1721        [`zip`]: trait.Stream.html#method.zip
1722
1723        # Example
1724
1725        ```
1726        # fn main() { async_std::task::block_on(async {
1727        #
1728        use async_std::prelude::*;
1729        use async_std::stream;
1730
1731        let s  = stream::from_iter(vec![(1,2), (3,4)]);
1732
1733        let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1734
1735        assert_eq!(left, [1, 3]);
1736        assert_eq!(right, [2, 4]);
1737        #
1738        # }) }
1739        ```
1740    "#]
1741    #[cfg(feature = "unstable")]
1742    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1743    fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1744    where
1745    FromA: Default + Extend<A>,
1746    FromB: Default + Extend<B>,
1747    Self: Stream<Item = (A, B)> + Sized,
1748    {
1749        UnzipFuture::new(self)
1750    }
1751
1752    #[doc = r#"
1753        Transforms a stream into a collection.
1754
1755        `collect()` can take anything streamable, and turn it into a relevant
1756        collection. This is one of the more powerful methods in the async
1757        standard library, used in a variety of contexts.
1758
1759        The most basic pattern in which `collect()` is used is to turn one
1760        collection into another. You take a collection, call [`into_stream`] on it,
1761        do a bunch of transformations, and then `collect()` at the end.
1762
1763        Because `collect()` is so general, it can cause problems with type
1764        inference. As such, `collect()` is one of the few times you'll see
1765        the syntax affectionately known as the 'turbofish': `::<>`. This
1766        helps the inference algorithm understand specifically which collection
1767        you're trying to collect into.
1768
1769        # Examples
1770
1771        ```
1772        # fn main() { async_std::task::block_on(async {
1773        #
1774        use async_std::prelude::*;
1775        use async_std::stream;
1776
1777        let s = stream::repeat(9u8).take(3);
1778        let buf: Vec<u8> = s.collect().await;
1779
1780        assert_eq!(buf, vec![9; 3]);
1781
1782        // You can also collect streams of Result values
1783        // into any collection that implements FromStream
1784        let s = stream::repeat(Ok(9)).take(3);
1785        // We are using Vec here, but other collections
1786        // are supported as well
1787        let buf: Result<Vec<u8>, ()> = s.collect().await;
1788
1789        assert_eq!(buf, Ok(vec![9; 3]));
1790
1791        // The stream will stop on the first Err and
1792        // return that instead
1793        let s = stream::repeat(Err(5)).take(3);
1794        let buf: Result<Vec<u8>, u8> = s.collect().await;
1795
1796        assert_eq!(buf, Err(5));
1797        #
1798        # }) }
1799        ```
1800
1801        [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
1802    "#]
1803    #[cfg(feature = "unstable")]
1804    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1805    fn collect<'a, B>(
1806        self,
1807    ) -> Pin<Box<dyn Future<Output = B> + 'a + Send>>
1808    where
1809        Self: Sized + 'a + Send,
1810        B: FromStream<Self::Item>,
1811        Self::Item: Send,
1812    {
1813        FromStream::from_stream(self)
1814    }
1815
1816    #[doc = r#"
1817        Combines multiple streams into a single stream of all their outputs.
1818
1819        Items are yielded as soon as they're received, and the stream continues yield until
1820        both streams have been exhausted. The output ordering between streams is not guaranteed.
1821
1822        # Examples
1823
1824        ```
1825        # async_std::task::block_on(async {
1826        use async_std::prelude::*;
1827        use async_std::stream::{self, FromStream};
1828
1829        let a = stream::once(1u8);
1830        let b = stream::once(2u8);
1831        let c = stream::once(3u8);
1832
1833        let s = a.merge(b).merge(c);
1834        let mut lst = Vec::from_stream(s).await;
1835
1836        lst.sort_unstable();
1837        assert_eq!(&lst, &[1u8, 2u8, 3u8]);
1838        # });
1839        ```
1840    "#]
1841    #[cfg(feature = "unstable")]
1842    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1843    fn merge<U>(self, other: U) -> Merge<Self, U>
1844    where
1845        Self: Sized,
1846        U: Stream<Item = Self::Item> + Sized,
1847    {
1848        Merge::new(self, other)
1849    }
1850
1851    #[doc = r#"
1852        Lexicographically compares the elements of this `Stream` with those
1853        of another.
1854
1855        # Examples
1856
1857        ```
1858        # fn main() { async_std::task::block_on(async {
1859        #
1860        use async_std::prelude::*;
1861        use async_std::stream;
1862
1863        use std::cmp::Ordering;
1864
1865        let s1 = stream::from_iter(vec![1]);
1866        let s2 = stream::from_iter(vec![1, 2]);
1867        let s3 = stream::from_iter(vec![1, 2, 3]);
1868        let s4 = stream::from_iter(vec![1, 2, 4]);
1869        assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal));
1870        assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less));
1871        assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater));
1872        assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less));
1873        assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater));
1874        #
1875        # }) }
1876        ```
1877    "#]
1878    fn partial_cmp<S>(
1879       self,
1880       other: S
1881    ) -> PartialCmpFuture<Self, S>
1882    where
1883        Self: Sized + Stream,
1884        S: Stream,
1885        <Self as Stream>::Item: PartialOrd<S::Item>,
1886    {
1887        PartialCmpFuture::new(self, other)
1888    }
1889
1890    #[doc = r#"
1891        Searches for an element in a Stream that satisfies a predicate, returning
1892        its index.
1893
1894        # Examples
1895
1896        ```
1897        # fn main() { async_std::task::block_on(async {
1898        #
1899        use async_std::prelude::*;
1900        use async_std::stream;
1901
1902        let s = stream::from_iter(vec![1usize, 2, 3]);
1903        let res = s.clone().position(|x| x == 1).await;
1904        assert_eq!(res, Some(0));
1905
1906        let res = s.clone().position(|x| x == 2).await;
1907        assert_eq!(res, Some(1));
1908
1909        let res = s.clone().position(|x| x == 3).await;
1910        assert_eq!(res, Some(2));
1911
1912        let res = s.clone().position(|x| x == 4).await;
1913        assert_eq!(res, None);
1914        #
1915        # }) }
1916        ```
1917    "#]
1918    fn position<P>(
1919       &mut self,
1920       predicate: P,
1921    ) -> PositionFuture<'_, Self, P>
1922    where
1923        Self: Unpin + Sized,
1924        P: FnMut(Self::Item) -> bool,
1925    {
1926        PositionFuture::new(self, predicate)
1927    }
1928
1929    #[doc = r#"
1930        Lexicographically compares the elements of this `Stream` with those
1931        of another using 'Ord'.
1932
1933        # Examples
1934
1935        ```
1936        # fn main() { async_std::task::block_on(async {
1937        #
1938        use async_std::prelude::*;
1939        use async_std::stream;
1940        use std::cmp::Ordering;
1941
1942        let s1 = stream::from_iter(vec![1]);
1943        let s2 = stream::from_iter(vec![1, 2]);
1944        let s3 = stream::from_iter(vec![1, 2, 3]);
1945        let s4 = stream::from_iter(vec![1, 2, 4]);
1946
1947        assert_eq!(s1.clone().cmp(s1.clone()).await, Ordering::Equal);
1948        assert_eq!(s1.clone().cmp(s2.clone()).await, Ordering::Less);
1949        assert_eq!(s2.clone().cmp(s1.clone()).await, Ordering::Greater);
1950        assert_eq!(s3.clone().cmp(s4.clone()).await, Ordering::Less);
1951        assert_eq!(s4.clone().cmp(s3.clone()).await, Ordering::Greater);
1952        #
1953        # }) }
1954        ```
1955    "#]
1956    fn cmp<S>(
1957       self,
1958       other: S
1959    ) -> CmpFuture<Self, S>
1960    where
1961        Self: Sized + Stream,
1962        S: Stream,
1963        <Self as Stream>::Item: Ord
1964    {
1965        CmpFuture::new(self, other)
1966    }
1967
1968    #[doc = r#"
1969        Counts the number of elements in the stream.
1970
1971        # Examples
1972
1973        ```
1974        # fn main() { async_std::task::block_on(async {
1975        #
1976        use async_std::prelude::*;
1977        use async_std::stream;
1978
1979        let s1 = stream::from_iter(vec![0]);
1980        let s2 = stream::from_iter(vec![1, 2, 3]);
1981
1982        assert_eq!(s1.count().await, 1);
1983        assert_eq!(s2.count().await, 3);
1984        #
1985        # }) }
1986        ```
1987    "#]
1988    #[cfg(feature = "unstable")]
1989    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1990    fn count(self) -> CountFuture<Self>
1991    where
1992        Self: Sized,
1993    {
1994        CountFuture::new(self)
1995    }
1996
1997    #[doc = r#"
1998        Determines if the elements of this `Stream` are lexicographically
1999        not equal to those of another.
2000
2001        # Examples
2002
2003        ```
2004        # fn main() { async_std::task::block_on(async {
2005        #
2006        use async_std::prelude::*;
2007        use async_std::stream;
2008
2009        let single     = stream::from_iter(vec![1usize]);
2010        let single_ne  = stream::from_iter(vec![10usize]);
2011        let multi      = stream::from_iter(vec![1usize,2]);
2012        let multi_ne   = stream::from_iter(vec![1usize,5]);
2013
2014        assert_eq!(single.clone().ne(single.clone()).await, false);
2015        assert_eq!(single_ne.clone().ne(single.clone()).await, true);
2016        assert_eq!(multi.clone().ne(single_ne.clone()).await, true);
2017        assert_eq!(multi_ne.clone().ne(multi.clone()).await, true);
2018        #
2019        # }) }
2020        ```
2021    "#]
2022    fn ne<S>(
2023       self,
2024       other: S
2025    ) -> NeFuture<Self, S>
2026    where
2027        Self: Sized,
2028        S: Sized + Stream,
2029        <Self as Stream>::Item: PartialEq<S::Item>,
2030    {
2031        NeFuture::new(self, other)
2032    }
2033
2034    #[doc = r#"
2035        Determines if the elements of this `Stream` are lexicographically
2036        greater than or equal to those of another.
2037
2038        # Examples
2039
2040        ```
2041        # fn main() { async_std::task::block_on(async {
2042        #
2043        use async_std::prelude::*;
2044        use async_std::stream;
2045
2046        let single    = stream::from_iter(vec![1]);
2047        let single_gt = stream::from_iter(vec![10]);
2048        let multi     = stream::from_iter(vec![1,2]);
2049        let multi_gt  = stream::from_iter(vec![1,5]);
2050
2051        assert_eq!(single.clone().ge(single.clone()).await, true);
2052        assert_eq!(single_gt.clone().ge(single.clone()).await, true);
2053        assert_eq!(multi.clone().ge(single_gt.clone()).await, false);
2054        assert_eq!(multi_gt.clone().ge(multi.clone()).await, true);
2055        #
2056        # }) }
2057        ```
2058    "#]
2059    fn ge<S>(
2060       self,
2061       other: S
2062    ) -> GeFuture<Self, S>
2063    where
2064        Self: Sized + Stream,
2065        S: Stream,
2066        <Self as Stream>::Item: PartialOrd<S::Item>,
2067    {
2068        GeFuture::new(self, other)
2069    }
2070
2071    #[doc = r#"
2072        Determines if the elements of this `Stream` are lexicographically
2073        equal to those of another.
2074
2075        # Examples
2076
2077        ```
2078        # fn main() { async_std::task::block_on(async {
2079        #
2080        use async_std::prelude::*;
2081        use async_std::stream;
2082
2083        let single     = stream::from_iter(vec![1]);
2084        let single_eq  = stream::from_iter(vec![10]);
2085        let multi      = stream::from_iter(vec![1,2]);
2086        let multi_eq   = stream::from_iter(vec![1,5]);
2087
2088        assert_eq!(single.clone().eq(single.clone()).await, true);
2089        assert_eq!(single_eq.clone().eq(single.clone()).await, false);
2090        assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
2091        assert_eq!(multi_eq.clone().eq(multi.clone()).await, false);
2092        #
2093        # }) }
2094        ```
2095    "#]
2096    fn eq<S>(
2097       self,
2098       other: S
2099    ) -> EqFuture<Self, S>
2100    where
2101        Self: Sized + Stream,
2102        S: Sized + Stream,
2103        <Self as Stream>::Item: PartialEq<S::Item>,
2104    {
2105        EqFuture::new(self, other)
2106    }
2107
2108    #[doc = r#"
2109        Determines if the elements of this `Stream` are lexicographically
2110        greater than those of another.
2111
2112        # Examples
2113
2114        ```
2115        # fn main() { async_std::task::block_on(async {
2116        #
2117        use async_std::prelude::*;
2118        use async_std::stream;
2119
2120        let single = stream::from_iter(vec![1]);
2121        let single_gt = stream::from_iter(vec![10]);
2122        let multi = stream::from_iter(vec![1,2]);
2123        let multi_gt = stream::from_iter(vec![1,5]);
2124
2125        assert_eq!(single.clone().gt(single.clone()).await, false);
2126        assert_eq!(single_gt.clone().gt(single.clone()).await, true);
2127        assert_eq!(multi.clone().gt(single_gt.clone()).await, false);
2128        assert_eq!(multi_gt.clone().gt(multi.clone()).await, true);
2129        #
2130        # }) }
2131        ```
2132    "#]
2133    fn gt<S>(
2134       self,
2135       other: S
2136    ) -> GtFuture<Self, S>
2137    where
2138        Self: Sized + Stream,
2139        S: Stream,
2140        <Self as Stream>::Item: PartialOrd<S::Item>,
2141    {
2142        GtFuture::new(self, other)
2143    }
2144
2145    #[doc = r#"
2146        Determines if the elements of this `Stream` are lexicographically
2147        less or equal to those of another.
2148
2149        # Examples
2150
2151        ```
2152        # fn main() { async_std::task::block_on(async {
2153        #
2154        use async_std::prelude::*;
2155        use async_std::stream;
2156
2157        let single = stream::from_iter(vec![1]);
2158        let single_gt = stream::from_iter(vec![10]);
2159        let multi = stream::from_iter(vec![1,2]);
2160        let multi_gt = stream::from_iter(vec![1,5]);
2161
2162        assert_eq!(single.clone().le(single.clone()).await, true);
2163        assert_eq!(single.clone().le(single_gt.clone()).await, true);
2164        assert_eq!(multi.clone().le(single_gt.clone()).await, true);
2165        assert_eq!(multi_gt.clone().le(multi.clone()).await, false);
2166        #
2167        # }) }
2168        ```
2169    "#]
2170    fn le<S>(
2171       self,
2172       other: S
2173    ) -> LeFuture<Self, S>
2174    where
2175        Self: Sized + Stream,
2176        S: Stream,
2177        <Self as Stream>::Item: PartialOrd<S::Item>,
2178    {
2179        LeFuture::new(self, other)
2180    }
2181
2182    #[doc = r#"
2183        Determines if the elements of this `Stream` are lexicographically
2184        less than those of another.
2185
2186        # Examples
2187
2188        ```
2189        # fn main() { async_std::task::block_on(async {
2190        #
2191        use async_std::prelude::*;
2192        use async_std::stream;
2193
2194        let single = stream::from_iter(vec![1]);
2195        let single_gt = stream::from_iter(vec![10]);
2196        let multi = stream::from_iter(vec![1,2]);
2197        let multi_gt = stream::from_iter(vec![1,5]);
2198
2199        assert_eq!(single.clone().lt(single.clone()).await, false);
2200        assert_eq!(single.clone().lt(single_gt.clone()).await, true);
2201        assert_eq!(multi.clone().lt(single_gt.clone()).await, true);
2202        assert_eq!(multi_gt.clone().lt(multi.clone()).await, false);
2203        #
2204        # }) }
2205        ```
2206    "#]
2207    fn lt<S>(
2208       self,
2209       other: S
2210    ) -> LtFuture<Self, S>
2211    where
2212        Self: Sized + Stream,
2213        S: Stream,
2214        <Self as Stream>::Item: PartialOrd<S::Item>,
2215    {
2216        LtFuture::new(self, other)
2217    }
2218
2219    #[doc = r#"
2220        Sums the elements of a stream.
2221
2222        Takes each element, adds them together, and returns the result.
2223
2224        An empty streams returns the zero value of the type.
2225
2226        # Panics
2227
2228        When calling `sum()` and a primitive integer type is being returned, this
2229        method will panic if the computation overflows and debug assertions are
2230        enabled.
2231
2232        # Examples
2233
2234        Basic usage:
2235
2236        ```
2237        # fn main() { async_std::task::block_on(async {
2238        #
2239        use async_std::prelude::*;
2240        use async_std::stream;
2241
2242        let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
2243        let sum: u8 = s.sum().await;
2244
2245        assert_eq!(sum, 10);
2246        #
2247        # }) }
2248        ```
2249    "#]
2250    #[cfg(feature = "unstable")]
2251    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2252    fn sum<'a, S>(
2253        self,
2254    ) -> Pin<Box<dyn Future<Output = S> + 'a>>
2255    where
2256        Self: Sized + Stream<Item = S> + 'a,
2257        S: Sum<Self::Item>,
2258    {
2259        Sum::sum(self)
2260    }
2261
2262    #[doc = r#"
2263        Multiplies all elements of the stream.
2264
2265        An empty stream returns the one value of the type.
2266
2267        # Panics
2268
2269        When calling `product()` and a primitive integer type is being returned,
2270        method will panic if the computation overflows and debug assertions are
2271        enabled.
2272
2273        # Examples
2274
2275        This example calculates the factorial of n (i.e. the product of the numbers from 1 to
2276        n, inclusive):
2277
2278        ```
2279        # fn main() { async_std::task::block_on(async {
2280        #
2281        async fn factorial(n: u32) -> u32 {
2282            use async_std::prelude::*;
2283            use async_std::stream;
2284
2285            let s = stream::from_iter(1..=n);
2286            s.product().await
2287        }
2288
2289        assert_eq!(factorial(0).await, 1);
2290        assert_eq!(factorial(1).await, 1);
2291        assert_eq!(factorial(5).await, 120);
2292        #
2293        # }) }
2294        ```
2295    "#]
2296    #[cfg(feature = "unstable")]
2297    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2298    fn product<'a, P>(
2299        self,
2300    ) -> Pin<Box<dyn Future<Output = P> + 'a>>
2301    where
2302        Self: Sized + Stream<Item = P> + 'a,
2303        P: Product,
2304    {
2305        Product::product(self)
2306    }
2307}
2308
2309impl<T: Stream + ?Sized> StreamExt for T {}
2310