1mod all;
25mod any;
26mod chain;
27mod cloned;
28mod cmp;
29mod copied;
30mod cycle;
31mod enumerate;
32mod eq;
33mod filter;
34mod filter_map;
35mod find;
36mod find_map;
37mod fold;
38mod for_each;
39mod fuse;
40mod ge;
41mod gt;
42mod inspect;
43mod last;
44mod le;
45mod lt;
46mod map;
47mod max;
48mod max_by;
49mod max_by_key;
50mod min;
51mod min_by;
52mod min_by_key;
53mod ne;
54mod next;
55mod nth;
56mod partial_cmp;
57mod position;
58mod scan;
59mod skip;
60mod skip_while;
61mod step_by;
62mod take;
63mod take_while;
64mod try_fold;
65mod try_for_each;
66mod zip;
67
68use all::AllFuture;
69use any::AnyFuture;
70use cmp::CmpFuture;
71use cycle::Cycle;
72use enumerate::Enumerate;
73use eq::EqFuture;
74use filter_map::FilterMap;
75use find::FindFuture;
76use find_map::FindMapFuture;
77use fold::FoldFuture;
78use for_each::ForEachFuture;
79use ge::GeFuture;
80use gt::GtFuture;
81use last::LastFuture;
82use le::LeFuture;
83use lt::LtFuture;
84use max::MaxFuture;
85use max_by::MaxByFuture;
86use max_by_key::MaxByKeyFuture;
87use min::MinFuture;
88use min_by::MinByFuture;
89use min_by_key::MinByKeyFuture;
90use ne::NeFuture;
91use next::NextFuture;
92use nth::NthFuture;
93use partial_cmp::PartialCmpFuture;
94use position::PositionFuture;
95use try_fold::TryFoldFuture;
96use try_for_each::TryForEachFuture;
97
98pub use chain::Chain;
99pub use cloned::Cloned;
100pub use copied::Copied;
101pub use filter::Filter;
102pub use fuse::Fuse;
103pub use inspect::Inspect;
104pub use map::Map;
105pub use scan::Scan;
106pub use skip::Skip;
107pub use skip_while::SkipWhile;
108pub use step_by::StepBy;
109pub use take::Take;
110pub use take_while::TakeWhile;
111pub use zip::Zip;
112
113use core::cmp::Ordering;
114
115cfg_unstable! {
116 use core::future::Future;
117 use core::pin::Pin;
118 use core::time::Duration;
119
120 use crate::stream::into_stream::IntoStream;
121 use crate::stream::{FromStream, Product, Sum};
122 use crate::stream::Extend;
123
124 use count::CountFuture;
125 use partition::PartitionFuture;
126 use unzip::UnzipFuture;
127
128 pub use merge::Merge;
129 pub use flatten::Flatten;
130 pub use flat_map::FlatMap;
131 pub use timeout::{TimeoutError, Timeout};
132 pub use throttle::Throttle;
133 pub use delay::Delay;
134
135 mod count;
136 mod merge;
137 mod flatten;
138 mod flat_map;
139 mod partition;
140 mod timeout;
141 mod throttle;
142 mod delay;
143 mod unzip;
144}
145
146pub use futures_core::stream::Stream as Stream;
147
148#[doc = r#"
149 Extension methods for [`Stream`].
150
151 [`Stream`]: ../stream/trait.Stream.html
152"#]
153pub trait StreamExt: Stream {
154 #[doc = r#"
155 Advances the stream and returns the next value.
156
157 Returns [`None`] when iteration is finished. Individual stream implementations may
158 choose to resume iteration, and so calling `next()` again may or may not eventually
159 start returning more values.
160
161 [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
162
163 # Examples
164
165 ```
166 # fn main() { async_std::task::block_on(async {
167 #
168 use async_std::prelude::*;
169 use async_std::stream;
170
171 let mut s = stream::once(7);
172
173 assert_eq!(s.next().await, Some(7));
174 assert_eq!(s.next().await, None);
175 #
176 # }) }
177 ```
178 "#]
179 fn next(&mut self) -> NextFuture<'_, Self>
180 where
181 Self: Unpin,
182 {
183 NextFuture { stream: self }
184 }
185
186 #[doc = r#"
187 Creates a stream that yields its first `n` elements.
188
189 # Examples
190
191 ```
192 # fn main() { async_std::task::block_on(async {
193 #
194 use async_std::prelude::*;
195 use async_std::stream;
196
197 let mut s = stream::repeat(9).take(3);
198
199 while let Some(v) = s.next().await {
200 assert_eq!(v, 9);
201 }
202 #
203 # }) }
204 ```
205 "#]
206 fn take(self, n: usize) -> Take<Self>
207 where
208 Self: Sized,
209 {
210 Take::new(self, n)
211 }
212
213 #[doc = r#"
214 Creates a stream that yields elements based on a predicate.
215
216 # Examples
217
218 ```
219 # fn main() { async_std::task::block_on(async {
220 #
221 use async_std::prelude::*;
222 use async_std::stream;
223
224 let s = stream::from_iter(vec![1, 2, 3, 4]);
225 let mut s = s.take_while(|x| x < &3 );
226
227 assert_eq!(s.next().await, Some(1));
228 assert_eq!(s.next().await, Some(2));
229 assert_eq!(s.next().await, None);
230 #
231 # }) }
232 ```
233 "#]
234 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
235 where
236 Self: Sized,
237 P: FnMut(&Self::Item) -> bool,
238 {
239 TakeWhile::new(self, predicate)
240 }
241
242 #[doc = r#"
243 Limit the amount of items yielded per timeslice in a stream.
244
245 This stream does not drop any items, but will only limit the rate at which items pass through.
246 # Examples
247 ```
248 # fn main() { async_std::task::block_on(async {
249 #
250 use async_std::prelude::*;
251 use async_std::stream;
252 use std::time::{Duration, Instant};
253
254 let start = Instant::now();
255
256 // emit value every 5 milliseconds
257 let s = stream::interval(Duration::from_millis(5)).take(2);
258
259 // throttle for 10 milliseconds
260 let mut s = s.throttle(Duration::from_millis(10));
261
262 s.next().await;
263 assert!(start.elapsed().as_millis() >= 5);
264
265 s.next().await;
266 assert!(start.elapsed().as_millis() >= 15);
267
268 s.next().await;
269 assert!(start.elapsed().as_millis() >= 25);
270 #
271 # }) }
272 ```
273 "#]
274 #[cfg(feature = "unstable")]
275 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
276 fn throttle(self, d: Duration) -> Throttle<Self>
277 where
278 Self: Sized,
279 {
280 Throttle::new(self, d)
281 }
282
283 #[doc = r#"
284 Creates a stream that yields each `step`th element.
285
286 # Panics
287
288 This method will panic if the given step is `0`.
289
290 # Examples
291
292 Basic usage:
293
294 ```
295 # fn main() { async_std::task::block_on(async {
296 #
297 use async_std::prelude::*;
298 use async_std::stream;
299
300 let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
301 let mut stepped = s.step_by(2);
302
303 assert_eq!(stepped.next().await, Some(0));
304 assert_eq!(stepped.next().await, Some(2));
305 assert_eq!(stepped.next().await, Some(4));
306 assert_eq!(stepped.next().await, None);
307
308 #
309 # }) }
310 ```
311 "#]
312 fn step_by(self, step: usize) -> StepBy<Self>
313 where
314 Self: Sized,
315 {
316 StepBy::new(self, step)
317 }
318
319 #[doc = r#"
320 Takes two streams and creates a new stream over both in sequence.
321
322 # Examples
323
324 Basic usage:
325
326 ```
327 # fn main() { async_std::task::block_on(async {
328 #
329 use async_std::prelude::*;
330 use async_std::stream;
331
332 let first = stream::from_iter(vec![0u8, 1]);
333 let second = stream::from_iter(vec![2, 3]);
334 let mut c = first.chain(second);
335
336 assert_eq!(c.next().await, Some(0));
337 assert_eq!(c.next().await, Some(1));
338 assert_eq!(c.next().await, Some(2));
339 assert_eq!(c.next().await, Some(3));
340 assert_eq!(c.next().await, None);
341
342 #
343 # }) }
344 ```
345 "#]
346 fn chain<U>(self, other: U) -> Chain<Self, U>
347 where
348 Self: Sized,
349 U: Stream<Item = Self::Item> + Sized,
350 {
351 Chain::new(self, other)
352 }
353
354 #[doc = r#"
355 Creates an stream which copies all of its elements.
356
357 # Examples
358
359 Basic usage:
360
361 ```
362 # fn main() { async_std::task::block_on(async {
363 #
364 use async_std::prelude::*;
365 use async_std::stream;
366
367 let v = stream::from_iter(vec![&1, &2, &3]);
368
369 let mut v_cloned = v.cloned();
370
371 assert_eq!(v_cloned.next().await, Some(1));
372 assert_eq!(v_cloned.next().await, Some(2));
373 assert_eq!(v_cloned.next().await, Some(3));
374 assert_eq!(v_cloned.next().await, None);
375
376 #
377 # }) }
378 ```
379 "#]
380 fn cloned<'a, T>(self) -> Cloned<Self>
381 where
382 Self: Sized + Stream<Item = &'a T>,
383 T: Clone + 'a,
384 {
385 Cloned::new(self)
386 }
387
388
389 #[doc = r#"
390 Creates an stream which copies all of its elements.
391
392 # Examples
393
394 Basic usage:
395
396 ```
397 # fn main() { async_std::task::block_on(async {
398 #
399 use async_std::prelude::*;
400 use async_std::stream;
401
402 let s = stream::from_iter(vec![&1, &2, &3]);
403 let mut s_copied = s.copied();
404
405 assert_eq!(s_copied.next().await, Some(1));
406 assert_eq!(s_copied.next().await, Some(2));
407 assert_eq!(s_copied.next().await, Some(3));
408 assert_eq!(s_copied.next().await, None);
409 #
410 # }) }
411 ```
412 "#]
413 fn copied<'a, T>(self) -> Copied<Self>
414 where
415 Self: Sized + Stream<Item = &'a T>,
416 T: Copy + 'a,
417 {
418 Copied::new(self)
419 }
420
421 #[doc = r#"
422 Creates a stream that yields the provided values infinitely and in order.
423
424 # Examples
425
426 Basic usage:
427
428 ```
429 # async_std::task::block_on(async {
430 #
431 use async_std::prelude::*;
432 use async_std::stream;
433
434 let mut s = stream::once(7).cycle();
435
436 assert_eq!(s.next().await, Some(7));
437 assert_eq!(s.next().await, Some(7));
438 assert_eq!(s.next().await, Some(7));
439 assert_eq!(s.next().await, Some(7));
440 assert_eq!(s.next().await, Some(7));
441 #
442 # })
443 ```
444 "#]
445 fn cycle(self) -> Cycle<Self>
446 where
447 Self: Clone + Sized,
448 {
449 Cycle::new(self)
450 }
451
452 #[doc = r#"
453 Creates a stream that gives the current element's count as well as the next value.
454
455 # Overflow behaviour.
456
457 This combinator does no guarding against overflows.
458
459 # Examples
460
461 ```
462 # fn main() { async_std::task::block_on(async {
463 #
464 use async_std::prelude::*;
465 use async_std::stream;
466
467 let s = stream::from_iter(vec!['a', 'b', 'c']);
468 let mut s = s.enumerate();
469
470 assert_eq!(s.next().await, Some((0, 'a')));
471 assert_eq!(s.next().await, Some((1, 'b')));
472 assert_eq!(s.next().await, Some((2, 'c')));
473 assert_eq!(s.next().await, None);
474 #
475 # }) }
476 ```
477 "#]
478 fn enumerate(self) -> Enumerate<Self>
479 where
480 Self: Sized,
481 {
482 Enumerate::new(self)
483 }
484
485 #[doc = r#"
486 Creates a stream that is delayed before it starts yielding items.
487
488 # Examples
489
490 ```
491 # fn main() { async_std::task::block_on(async {
492 #
493 use async_std::prelude::*;
494 use async_std::stream;
495 use std::time::{Duration, Instant};
496
497 let start = Instant::now();
498 let mut s = stream::from_iter(vec![0u8, 1, 2]).delay(Duration::from_millis(200));
499
500 assert_eq!(s.next().await, Some(0));
501 // The first time will take more than 200ms due to delay.
502 assert!(start.elapsed().as_millis() >= 200);
503
504 assert_eq!(s.next().await, Some(1));
505 // There will be no delay after the first time.
506 assert!(start.elapsed().as_millis() < 400);
507
508 assert_eq!(s.next().await, Some(2));
509 assert!(start.elapsed().as_millis() < 400);
510
511 assert_eq!(s.next().await, None);
512 assert!(start.elapsed().as_millis() < 400);
513 #
514 # }) }
515 ```
516 "#]
517 #[cfg(any(feature = "unstable", feature = "docs"))]
518 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
519 fn delay(self, dur: std::time::Duration) -> Delay<Self>
520 where
521 Self: Sized,
522 {
523 Delay::new(self, dur)
524 }
525
526 #[doc = r#"
527 Takes a closure and creates a stream that calls that closure on every element of this stream.
528
529 # Examples
530
531 ```
532 # fn main() { async_std::task::block_on(async {
533 #
534 use async_std::prelude::*;
535 use async_std::stream;
536
537 let s = stream::from_iter(vec![1, 2, 3]);
538 let mut s = s.map(|x| 2 * x);
539
540 assert_eq!(s.next().await, Some(2));
541 assert_eq!(s.next().await, Some(4));
542 assert_eq!(s.next().await, Some(6));
543 assert_eq!(s.next().await, None);
544
545 #
546 # }) }
547 ```
548 "#]
549 fn map<B, F>(self, f: F) -> Map<Self, F>
550 where
551 Self: Sized,
552 F: FnMut(Self::Item) -> B,
553 {
554 Map::new(self, f)
555 }
556
557 #[doc = r#"
558 A combinator that does something with each element in the stream, passing the value
559 on.
560
561 # Examples
562
563 Basic usage:
564
565 ```
566 # fn main() { async_std::task::block_on(async {
567 #
568 use async_std::prelude::*;
569 use async_std::stream;
570
571 let s = stream::from_iter(vec![1, 2, 3, 4, 5]);
572
573 let sum = s
574 .inspect(|x| println!("about to filter {}", x))
575 .filter(|x| x % 2 == 0)
576 .inspect(|x| println!("made it through filter: {}", x))
577 .fold(0, |sum, i| sum + i)
578 .await;
579
580 assert_eq!(sum, 6);
581 #
582 # }) }
583 ```
584 "#]
585 fn inspect<F>(self, f: F) -> Inspect<Self, F>
586 where
587 Self: Sized,
588 F: FnMut(&Self::Item),
589 {
590 Inspect::new(self, f)
591 }
592
593 #[doc = r#"
594 Returns the last element of the stream.
595
596 # Examples
597
598 Basic usage:
599
600 ```
601 # fn main() { async_std::task::block_on(async {
602 #
603 use async_std::prelude::*;
604 use async_std::stream;
605
606 let s = stream::from_iter(vec![1, 2, 3]);
607
608 let last = s.last().await;
609 assert_eq!(last, Some(3));
610 #
611 # }) }
612 ```
613
614 An empty stream will return `None`:
615 ```
616 # fn main() { async_std::task::block_on(async {
617 #
618 use async_std::stream;
619 use crate::async_std::prelude::*;
620
621 let s = stream::empty::<()>();
622
623 let last = s.last().await;
624 assert_eq!(last, None);
625 #
626 # }) }
627 ```
628 "#]
629 fn last(
630 self,
631 ) -> LastFuture<Self, Self::Item>
632 where
633 Self: Sized,
634 {
635 LastFuture::new(self)
636 }
637
638 #[doc = r#"
639 Creates a stream which ends after the first `None`.
640
641 After a stream returns `None`, future calls may or may not yield `Some(T)` again.
642 `fuse()` adapts an iterator, ensuring that after a `None` is given, it will always
643 return `None` forever.
644
645 # Examples
646
647 ```
648 # fn main() { async_std::task::block_on(async {
649 #
650 use async_std::prelude::*;
651 use async_std::stream;
652
653 let mut s = stream::once(1).fuse();
654 assert_eq!(s.next().await, Some(1));
655 assert_eq!(s.next().await, None);
656 assert_eq!(s.next().await, None);
657 #
658 # }) }
659 ```
660 "#]
661 fn fuse(self) -> Fuse<Self>
662 where
663 Self: Sized,
664 {
665 Fuse::new(self)
666 }
667
668 #[doc = r#"
669 Creates a stream that uses a predicate to determine if an element should be yielded.
670
671 # Examples
672
673 Basic usage:
674
675 ```
676 # fn main() { async_std::task::block_on(async {
677 #
678 use async_std::prelude::*;
679 use async_std::stream;
680
681 let s = stream::from_iter(vec![1, 2, 3, 4]);
682 let mut s = s.filter(|i| i % 2 == 0);
683
684 assert_eq!(s.next().await, Some(2));
685 assert_eq!(s.next().await, Some(4));
686 assert_eq!(s.next().await, None);
687 #
688 # }) }
689 ```
690 "#]
691 fn filter<P>(self, predicate: P) -> Filter<Self, P>
692 where
693 Self: Sized,
694 P: FnMut(&Self::Item) -> bool,
695 {
696 Filter::new(self, predicate)
697 }
698
699 #[doc= r#"
700 Creates an stream that works like map, but flattens nested structure.
701
702 # Examples
703
704 Basic usage:
705
706 ```
707 # async_std::task::block_on(async {
708
709 use async_std::prelude::*;
710 use async_std::stream;
711
712 let words = stream::from_iter(&["alpha", "beta", "gamma"]);
713
714 let merged: String = words
715 .flat_map(|s| stream::from_iter(s.chars()))
716 .collect().await;
717 assert_eq!(merged, "alphabetagamma");
718
719 let d3 = stream::from_iter(&[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]);
720 let d1: Vec<_> = d3
721 .flat_map(|item| stream::from_iter(item))
722 .flat_map(|item| stream::from_iter(item))
723 .collect().await;
724
725 assert_eq!(d1, [&1, &2, &3, &4, &5, &6, &7, &8]);
726 # });
727 ```
728 "#]
729 #[cfg(feature = "unstable")]
730 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
731 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
732 where
733 Self: Sized,
734 U: IntoStream,
735 F: FnMut(Self::Item) -> U,
736 {
737 FlatMap::new(self, f)
738 }
739
740 #[doc = r#"
741 Creates an stream that flattens nested structure.
742
743 # Examples
744
745 Basic usage:
746
747 ```
748 # async_std::task::block_on(async {
749
750 use async_std::prelude::*;
751 use async_std::stream;
752
753 let inner1 = stream::from_iter(vec![1u8,2,3]);
754 let inner2 = stream::from_iter(vec![4u8,5,6]);
755 let s = stream::from_iter(vec![inner1, inner2]);
756
757 let v: Vec<_> = s.flatten().collect().await;
758
759 assert_eq!(v, vec![1,2,3,4,5,6]);
760
761 # });
762 "#]
763 #[cfg(feature = "unstable")]
764 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
765 fn flatten(self) -> Flatten<Self>
766 where
767 Self: Sized,
768 Self::Item: IntoStream,
769 {
770 Flatten::new(self)
771 }
772
773 #[doc = r#"
774 Both filters and maps a stream.
775
776 # Examples
777
778 Basic usage:
779
780 ```
781 # fn main() { async_std::task::block_on(async {
782 #
783
784 use async_std::prelude::*;
785 use async_std::stream;
786
787 let s = stream::from_iter(vec!["1", "lol", "3", "NaN", "5"]);
788
789 let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());
790
791 let one = parsed.next().await;
792 assert_eq!(one, Some(1));
793
794 let three = parsed.next().await;
795 assert_eq!(three, Some(3));
796
797 let five = parsed.next().await;
798 assert_eq!(five, Some(5));
799
800 let end = parsed.next().await;
801 assert_eq!(end, None);
802 #
803 # }) }
804 ```
805 "#]
806 fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
807 where
808 Self: Sized,
809 F: FnMut(Self::Item) -> Option<B>,
810 {
811 FilterMap::new(self, f)
812 }
813
814 #[doc = r#"
815 Returns the element that gives the minimum value with respect to the
816 specified key function. If several elements are equally minimum,
817 the first element is returned. If the stream is empty, `None` is returned.
818
819 # Examples
820
821 ```
822 # fn main() { async_std::task::block_on(async {
823 #
824 use async_std::prelude::*;
825 use async_std::stream;
826
827 let s = stream::from_iter(vec![-1isize, 2, -3]);
828
829 let min = s.clone().min_by_key(|x| x.abs()).await;
830 assert_eq!(min, Some(-1));
831
832 let min = stream::empty::<isize>().min_by_key(|x| x.abs()).await;
833 assert_eq!(min, None);
834 #
835 # }) }
836 ```
837 "#]
838 fn min_by_key<B, F>(
839 self,
840 key_by: F,
841 ) -> MinByKeyFuture<Self, Self::Item, F>
842 where
843 Self: Sized,
844 B: Ord,
845 F: FnMut(&Self::Item) -> B,
846 {
847 MinByKeyFuture::new(self, key_by)
848 }
849
850 #[doc = r#"
851 Returns the element that gives the maximum value with respect to the
852 specified key function. If several elements are equally maximum,
853 the first element is returned. If the stream is empty, `None` is returned.
854
855 # Examples
856
857 ```
858 # fn main() { async_std::task::block_on(async {
859 #
860 use async_std::prelude::*;
861 use async_std::stream;
862
863 let s = stream::from_iter(vec![-3_i32, 0, 1, 5, -10]);
864
865 let max = s.clone().max_by_key(|x| x.abs()).await;
866 assert_eq!(max, Some(-10));
867
868 let max = stream::empty::<isize>().max_by_key(|x| x.abs()).await;
869 assert_eq!(max, None);
870 #
871 # }) }
872 ```
873 "#]
874 fn max_by_key<B, F>(
875 self,
876 key_by: F,
877 ) -> MaxByKeyFuture<Self, Self::Item, F>
878 where
879 Self: Sized,
880 B: Ord,
881 F: FnMut(&Self::Item) -> B,
882 {
883 MaxByKeyFuture::new(self, key_by)
884 }
885
886 #[doc = r#"
887 Returns the element that gives the minimum value with respect to the
888 specified comparison function. If several elements are equally minimum,
889 the first element is returned. If the stream is empty, `None` is returned.
890
891 # Examples
892
893 ```
894 # fn main() { async_std::task::block_on(async {
895 #
896 use async_std::prelude::*;
897 use async_std::stream;
898
899 let s = stream::from_iter(vec![1u8, 2, 3]);
900
901 let min = s.clone().min_by(|x, y| x.cmp(y)).await;
902 assert_eq!(min, Some(1));
903
904 let min = s.min_by(|x, y| y.cmp(x)).await;
905 assert_eq!(min, Some(3));
906
907 let min = stream::empty::<u8>().min_by(|x, y| x.cmp(y)).await;
908 assert_eq!(min, None);
909 #
910 # }) }
911 ```
912 "#]
913 fn min_by<F>(
914 self,
915 compare: F,
916 ) -> MinByFuture<Self, F, Self::Item>
917 where
918 Self: Sized,
919 F: FnMut(&Self::Item, &Self::Item) -> Ordering,
920 {
921 MinByFuture::new(self, compare)
922 }
923
924 #[doc = r#"
925 Returns the element that gives the maximum value. If several elements are equally maximum,
926 the first element is returned. If the stream is empty, `None` is returned.
927
928 # Examples
929
930 ```
931 # fn main() { async_std::task::block_on(async {
932 #
933 use async_std::prelude::*;
934 use async_std::stream;
935
936 let s = stream::from_iter(vec![1usize, 2, 3]);
937
938 let max = s.clone().max().await;
939 assert_eq!(max, Some(3));
940
941 let max = stream::empty::<usize>().max().await;
942 assert_eq!(max, None);
943 #
944 # }) }
945 ```
946 "#]
947 fn max(
948 self,
949 ) -> MaxFuture<Self, Self::Item>
950 where
951 Self: Sized,
952 Self::Item: Ord,
953 {
954 MaxFuture::new(self)
955 }
956
957 #[doc = r#"
958 Returns the element that gives the minimum value. If several elements are equally minimum,
959 the first element is returned. If the stream is empty, `None` is returned.
960
961 # Examples
962
963 ```
964 # fn main() { async_std::task::block_on(async {
965 #
966 use async_std::prelude::*;
967 use async_std::stream;
968
969 let s = stream::from_iter(vec![1usize, 2, 3]);
970
971 let min = s.clone().min().await;
972 assert_eq!(min, Some(1));
973
974 let min = stream::empty::<usize>().min().await;
975 assert_eq!(min, None);
976 #
977 # }) }
978 ```
979 "#]
980 fn min(
981 self,
982 ) -> MinFuture<Self, Self::Item>
983 where
984 Self: Sized,
985 Self::Item: Ord,
986 {
987 MinFuture::new(self)
988 }
989
990 #[doc = r#"
991 Returns the element that gives the maximum value with respect to the
992 specified comparison function. If several elements are equally maximum,
993 the first element is returned. If the stream is empty, `None` is returned.
994
995 # Examples
996
997 ```
998 # fn main() { async_std::task::block_on(async {
999 #
1000 use async_std::prelude::*;
1001 use async_std::stream;
1002
1003 let s = stream::from_iter(vec![1u8, 2, 3]);
1004
1005 let max = s.clone().max_by(|x, y| x.cmp(y)).await;
1006 assert_eq!(max, Some(3));
1007
1008 let max = s.max_by(|x, y| y.cmp(x)).await;
1009 assert_eq!(max, Some(1));
1010
1011 let max = stream::empty::<usize>().max_by(|x, y| x.cmp(y)).await;
1012 assert_eq!(max, None);
1013 #
1014 # }) }
1015 ```
1016 "#]
1017 fn max_by<F>(
1018 self,
1019 compare: F,
1020 ) -> MaxByFuture<Self, F, Self::Item>
1021 where
1022 Self: Sized,
1023 F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1024 {
1025 MaxByFuture::new(self, compare)
1026 }
1027
1028 #[doc = r#"
1029 Returns the nth element of the stream.
1030
1031 # Examples
1032
1033 Basic usage:
1034
1035 ```
1036 # fn main() { async_std::task::block_on(async {
1037 #
1038 use async_std::prelude::*;
1039 use async_std::stream;
1040
1041 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1042
1043 let second = s.nth(1).await;
1044 assert_eq!(second, Some(2));
1045 #
1046 # }) }
1047 ```
1048 Calling `nth()` multiple times:
1049
1050 ```
1051 # fn main() { async_std::task::block_on(async {
1052 #
1053 use async_std::stream;
1054 use async_std::prelude::*;
1055
1056 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1057
1058 let second = s.nth(0).await;
1059 assert_eq!(second, Some(1));
1060
1061 let second = s.nth(0).await;
1062 assert_eq!(second, Some(2));
1063 #
1064 # }) }
1065 ```
1066 Returning `None` if the stream finished before returning `n` elements:
1067 ```
1068 # fn main() { async_std::task::block_on(async {
1069 #
1070 use async_std::prelude::*;
1071 use async_std::stream;
1072
1073 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1074
1075 let fourth = s.nth(4).await;
1076 assert_eq!(fourth, None);
1077 #
1078 # }) }
1079 ```
1080 "#]
1081 fn nth(
1082 &mut self,
1083 n: usize,
1084 ) -> NthFuture<'_, Self>
1085 where
1086 Self: Unpin + Sized,
1087 {
1088 NthFuture::new(self, n)
1089 }
1090
1091 #[doc = r#"
1092 Tests if every element of the stream matches a predicate.
1093
1094 `all()` takes a closure that returns `true` or `false`. It applies
1095 this closure to each element of the stream, and if they all return
1096 `true`, then so does `all()`. If any of them return `false`, it
1097 returns `false`.
1098
1099 `all()` is short-circuiting; in other words, it will stop processing
1100 as soon as it finds a `false`, given that no matter what else happens,
1101 the result will also be `false`.
1102
1103 An empty stream returns `true`.
1104
1105 # Examples
1106
1107 Basic usage:
1108
1109 ```
1110 # fn main() { async_std::task::block_on(async {
1111 #
1112 use async_std::prelude::*;
1113 use async_std::stream;
1114
1115 let mut s = stream::repeat::<u32>(42).take(3);
1116 assert!(s.all(|x| x == 42).await);
1117
1118 #
1119 # }) }
1120 ```
1121
1122 Empty stream:
1123
1124 ```
1125 # fn main() { async_std::task::block_on(async {
1126 #
1127 use async_std::prelude::*;
1128 use async_std::stream;
1129
1130 let mut s = stream::empty::<u32>();
1131 assert!(s.all(|_| false).await);
1132 #
1133 # }) }
1134 ```
1135 "#]
1136 #[inline]
1137 fn all<F>(
1138 &mut self,
1139 f: F,
1140 ) -> AllFuture<'_, Self, F, Self::Item>
1141 where
1142 Self: Unpin + Sized,
1143 F: FnMut(Self::Item) -> bool,
1144 {
1145 AllFuture::new(self, f)
1146 }
1147
1148 #[doc = r#"
1149 Searches for an element in a stream that satisfies a predicate.
1150
1151 # Examples
1152
1153 Basic usage:
1154
1155 ```
1156 # fn main() { async_std::task::block_on(async {
1157 #
1158 use async_std::prelude::*;
1159 use async_std::stream;
1160
1161 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1162 let res = s.find(|x| *x == 2).await;
1163 assert_eq!(res, Some(2));
1164 #
1165 # }) }
1166 ```
1167
1168 Resuming after a first find:
1169
1170 ```
1171 # fn main() { async_std::task::block_on(async {
1172 #
1173 use async_std::prelude::*;
1174 use async_std::stream;
1175
1176 let mut s= stream::from_iter(vec![1, 2, 3]);
1177 let res = s.find(|x| *x == 2).await;
1178 assert_eq!(res, Some(2));
1179
1180 let next = s.next().await;
1181 assert_eq!(next, Some(3));
1182 #
1183 # }) }
1184 ```
1185 "#]
1186 fn find<P>(
1187 &mut self,
1188 p: P,
1189 ) -> FindFuture<'_, Self, P>
1190 where
1191 Self: Unpin + Sized,
1192 P: FnMut(&Self::Item) -> bool,
1193 {
1194 FindFuture::new(self, p)
1195 }
1196
1197 #[doc = r#"
1198 Applies function to the elements of stream and returns the first non-none result.
1199
1200 ```
1201 # fn main() { async_std::task::block_on(async {
1202 #
1203 use async_std::prelude::*;
1204 use async_std::stream;
1205
1206 let mut s = stream::from_iter(vec!["lol", "NaN", "2", "5"]);
1207 let first_number = s.find_map(|s| s.parse().ok()).await;
1208
1209 assert_eq!(first_number, Some(2));
1210 #
1211 # }) }
1212 ```
1213 "#]
1214 fn find_map<F, B>(
1215 &mut self,
1216 f: F,
1217 ) -> FindMapFuture<'_, Self, F>
1218 where
1219 Self: Unpin + Sized,
1220 F: FnMut(Self::Item) -> Option<B>,
1221 {
1222 FindMapFuture::new(self, f)
1223 }
1224
1225 #[doc = r#"
1226 A combinator that applies a function to every element in a stream
1227 producing a single, final value.
1228
1229 # Examples
1230
1231 Basic usage:
1232
1233 ```
1234 # fn main() { async_std::task::block_on(async {
1235 #
1236 use async_std::prelude::*;
1237 use async_std::stream;
1238
1239 let s = stream::from_iter(vec![1u8, 2, 3]);
1240 let sum = s.fold(0, |acc, x| acc + x).await;
1241
1242 assert_eq!(sum, 6);
1243 #
1244 # }) }
1245 ```
1246 "#]
1247 fn fold<B, F>(
1248 self,
1249 init: B,
1250 f: F,
1251 ) -> FoldFuture<Self, F, B>
1252 where
1253 Self: Sized,
1254 F: FnMut(B, Self::Item) -> B,
1255 {
1256 FoldFuture::new(self, init, f)
1257 }
1258
1259 #[doc = r#"
1260 A combinator that applies a function to every element in a stream
1261 creating two collections from it.
1262
1263 # Examples
1264
1265 Basic usage:
1266
1267 ```
1268 # fn main() { async_std::task::block_on(async {
1269 #
1270 use async_std::prelude::*;
1271 use async_std::stream;
1272
1273 let (even, odd): (Vec<i32>, Vec<i32>) = stream::from_iter(vec![1, 2, 3])
1274 .partition(|&n| n % 2 == 0).await;
1275
1276 assert_eq!(even, vec![2]);
1277 assert_eq!(odd, vec![1, 3]);
1278
1279 #
1280 # }) }
1281 ```
1282 "#]
1283 #[cfg(feature = "unstable")]
1284 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1285 fn partition<B, F>(
1286 self,
1287 f: F,
1288 ) -> PartitionFuture<Self, F, B>
1289 where
1290 Self: Sized,
1291 F: FnMut(&Self::Item) -> bool,
1292 B: Default + Extend<Self::Item>,
1293 {
1294 PartitionFuture::new(self, f)
1295 }
1296
1297 #[doc = r#"
1298 Call a closure on each element of the stream.
1299
1300 # Examples
1301
1302 ```
1303 # fn main() { async_std::task::block_on(async {
1304 #
1305 use async_std::prelude::*;
1306 use async_std::stream;
1307 use std::sync::mpsc::channel;
1308
1309 let (tx, rx) = channel();
1310
1311 let s = stream::from_iter(vec![1usize, 2, 3]);
1312 let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;
1313
1314 let v: Vec<_> = rx.iter().collect();
1315
1316 assert_eq!(v, vec![1, 2, 3]);
1317 #
1318 # }) }
1319 ```
1320 "#]
1321 fn for_each<F>(
1322 self,
1323 f: F,
1324 ) -> ForEachFuture<Self, F>
1325 where
1326 Self: Sized,
1327 F: FnMut(Self::Item),
1328 {
1329 ForEachFuture::new(self, f)
1330 }
1331
1332 #[doc = r#"
1333 Tests if any element of the stream matches a predicate.
1334
1335 `any()` takes a closure that returns `true` or `false`. It applies
1336 this closure to each element of the stream, and if any of them return
1337 `true`, then so does `any()`. If they all return `false`, it
1338 returns `false`.
1339
1340 `any()` is short-circuiting; in other words, it will stop processing
1341 as soon as it finds a `true`, given that no matter what else happens,
1342 the result will also be `true`.
1343
1344 An empty stream returns `false`.
1345
1346 # Examples
1347
1348 Basic usage:
1349
1350 ```
1351 # fn main() { async_std::task::block_on(async {
1352 #
1353 use async_std::prelude::*;
1354 use async_std::stream;
1355
1356 let mut s = stream::repeat::<u32>(42).take(3);
1357 assert!(s.any(|x| x == 42).await);
1358 #
1359 # }) }
1360 ```
1361
1362 Empty stream:
1363
1364 ```
1365 # fn main() { async_std::task::block_on(async {
1366 #
1367 use async_std::prelude::*;
1368 use async_std::stream;
1369
1370 let mut s = stream::empty::<u32>();
1371 assert!(!s.any(|_| false).await);
1372 #
1373 # }) }
1374 ```
1375 "#]
1376 #[inline]
1377 fn any<F>(
1378 &mut self,
1379 f: F,
1380 ) -> AnyFuture<'_, Self, F, Self::Item>
1381 where
1382 Self: Unpin + Sized,
1383 F: FnMut(Self::Item) -> bool,
1384 {
1385 AnyFuture::new(self, f)
1386 }
1387
1388 #[doc = r#"
1389 Borrows an stream, rather than consuming it.
1390
1391 This is useful to allow applying stream adaptors while still retaining ownership of the original stream.
1392
1393 # Examples
1394
1395 ```
1396 # fn main() { async_std::task::block_on(async {
1397 #
1398 use async_std::prelude::*;
1399 use async_std::stream;
1400
1401 let a = vec![1isize, 2, 3];
1402
1403 let stream = stream::from_iter(a);
1404
1405 let sum: isize = stream.take(5).sum().await;
1406
1407 assert_eq!(sum, 6);
1408
1409 // if we try to use stream again, it won't work. The following line
1410 // gives error: use of moved value: `stream`
1411 // assert_eq!(stream.next(), None);
1412
1413 // let's try that again
1414 let a = vec![1isize, 2, 3];
1415
1416 let mut stream = stream::from_iter(a);
1417
1418 // instead, we add in a .by_ref()
1419 let sum: isize = stream.by_ref().take(2).sum().await;
1420
1421 assert_eq!(sum, 3);
1422
1423 // now this is just fine:
1424 assert_eq!(stream.next().await, Some(3));
1425 assert_eq!(stream.next().await, None);
1426 #
1427 # }) }
1428 ```
1429 "#]
1430 #[cfg(feature = "unstable")]
1431 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1432 fn by_ref(&mut self) -> &mut Self {
1433 self
1434 }
1435
1436 #[doc = r#"
1437 A stream adaptor similar to [`fold`] that holds internal state and produces a new
1438 stream.
1439
1440 [`fold`]: #method.fold
1441
1442 `scan()` takes two arguments: an initial value which seeds the internal state, and
1443 a closure with two arguments, the first being a mutable reference to the internal
1444 state and the second a stream element. The closure can assign to the internal state
1445 to share state between iterations.
1446
1447 On iteration, the closure will be applied to each element of the stream and the
1448 return value from the closure, an `Option`, is yielded by the stream.
1449
1450 ## Examples
1451
1452 ```
1453 # fn main() { async_std::task::block_on(async {
1454 #
1455 use async_std::prelude::*;
1456 use async_std::stream;
1457
1458 let s = stream::from_iter(vec![1isize, 2, 3]);
1459 let mut s = s.scan(1, |state, x| {
1460 *state = *state * x;
1461 Some(-*state)
1462 });
1463
1464 assert_eq!(s.next().await, Some(-1));
1465 assert_eq!(s.next().await, Some(-2));
1466 assert_eq!(s.next().await, Some(-6));
1467 assert_eq!(s.next().await, None);
1468 #
1469 # }) }
1470 ```
1471 "#]
1472 #[inline]
1473 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1474 where
1475 Self: Sized,
1476 F: FnMut(&mut St, Self::Item) -> Option<B>,
1477 {
1478 Scan::new(self, initial_state, f)
1479 }
1480
1481 #[doc = r#"
1482 Combinator that `skip`s elements based on a predicate.
1483
1484 Takes a closure argument. It will call this closure on every element in
1485 the stream and ignore elements until it returns `false`.
1486
1487 After `false` is returned, `SkipWhile`'s job is over and all further
1488 elements in the stream are yielded.
1489
1490 ## Examples
1491
1492 ```
1493 # fn main() { async_std::task::block_on(async {
1494 #
1495 use async_std::prelude::*;
1496 use async_std::stream;
1497
1498 let a = stream::from_iter(vec![-1i32, 0, 1]);
1499 let mut s = a.skip_while(|x| x.is_negative());
1500
1501 assert_eq!(s.next().await, Some(0));
1502 assert_eq!(s.next().await, Some(1));
1503 assert_eq!(s.next().await, None);
1504 #
1505 # }) }
1506 ```
1507 "#]
1508 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1509 where
1510 Self: Sized,
1511 P: FnMut(&Self::Item) -> bool,
1512 {
1513 SkipWhile::new(self, predicate)
1514 }
1515
1516 #[doc = r#"
1517 Creates a combinator that skips the first `n` elements.
1518
1519 ## Examples
1520
1521 ```
1522 # fn main() { async_std::task::block_on(async {
1523 #
1524 use async_std::prelude::*;
1525 use async_std::stream;
1526
1527 let s = stream::from_iter(vec![1u8, 2, 3]);
1528 let mut skipped = s.skip(2);
1529
1530 assert_eq!(skipped.next().await, Some(3));
1531 assert_eq!(skipped.next().await, None);
1532 #
1533 # }) }
1534 ```
1535 "#]
1536 fn skip(self, n: usize) -> Skip<Self>
1537 where
1538 Self: Sized,
1539 {
1540 Skip::new(self, n)
1541 }
1542
1543 #[doc=r#"
1544 Await a stream or times out after a duration of time.
1545
1546 If you want to await an I/O future consider using
1547 [`io::timeout`](../io/fn.timeout.html) instead.
1548
1549 # Examples
1550
1551 ```
1552 # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
1553 #
1554 use std::time::Duration;
1555
1556 use async_std::stream;
1557 use async_std::prelude::*;
1558
1559 let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1));
1560
1561 while let Some(v) = s.next().await {
1562 assert_eq!(v, Ok(1));
1563 }
1564
1565 // when timeout
1566 let mut s = stream::pending::<()>().timeout(Duration::from_millis(10));
1567 match s.next().await {
1568 Some(item) => assert!(item.is_err()),
1569 None => panic!()
1570 };
1571 #
1572 # Ok(()) }) }
1573 ```
1574 "#]
1575 #[cfg(any(feature = "unstable", feature = "docs"))]
1576 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1577 fn timeout(self, dur: Duration) -> Timeout<Self>
1578 where
1579 Self: Stream + Sized,
1580 {
1581 Timeout::new(self, dur)
1582 }
1583
1584 #[doc = r#"
1585 A combinator that applies a function as long as it returns successfully, producing a single, final value.
1586 Immediately returns the error when the function returns unsuccessfully.
1587
1588 # Examples
1589
1590 Basic usage:
1591
1592 ```
1593 # fn main() { async_std::task::block_on(async {
1594 #
1595 use async_std::prelude::*;
1596 use async_std::stream;
1597
1598 let mut s = stream::from_iter(vec![1usize, 2, 3]);
1599 let sum = s.try_fold(0, |acc, v| {
1600 if (acc+v) % 2 == 1 {
1601 Ok(v+3)
1602 } else {
1603 Err("fail")
1604 }
1605 }).await;
1606
1607 assert_eq!(sum, Err("fail"));
1608 #
1609 # }) }
1610 ```
1611 "#]
1612 fn try_fold<B, F, T, E>(
1613 &mut self,
1614 init: T,
1615 f: F,
1616 ) -> TryFoldFuture<'_, Self, F, T>
1617 where
1618 Self: Unpin + Sized,
1619 F: FnMut(B, Self::Item) -> Result<T, E>,
1620 {
1621 TryFoldFuture::new(self, init, f)
1622 }
1623
1624 #[doc = r#"
1625 Applies a falliable function to each element in a stream, stopping at first error and returning it.
1626
1627 # Examples
1628
1629 ```
1630 # fn main() { async_std::task::block_on(async {
1631 #
1632 use std::sync::mpsc::channel;
1633 use async_std::prelude::*;
1634 use async_std::stream;
1635
1636 let (tx, rx) = channel();
1637
1638 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1639 let s = s.try_for_each(|v| {
1640 if v % 2 == 1 {
1641 tx.clone().send(v).unwrap();
1642 Ok(())
1643 } else {
1644 Err("even")
1645 }
1646 });
1647
1648 let res = s.await;
1649 drop(tx);
1650 let values: Vec<_> = rx.iter().collect();
1651
1652 assert_eq!(values, vec![1]);
1653 assert_eq!(res, Err("even"));
1654 #
1655 # }) }
1656 ```
1657 "#]
1658 fn try_for_each<F, E>(
1659 &mut self,
1660 f: F,
1661 ) -> TryForEachFuture<'_, Self, F>
1662 where
1663 Self: Unpin + Sized,
1664 F: FnMut(Self::Item) -> Result<(), E>,
1665 {
1666 TryForEachFuture::new(self, f)
1667 }
1668
1669 #[doc = r#"
1670 'Zips up' two streams into a single stream of pairs.
1671
1672 `zip()` returns a new stream that will iterate over two other streams, returning a
1673 tuple where the first element comes from the first stream, and the second element
1674 comes from the second stream.
1675
1676 In other words, it zips two streams together, into a single one.
1677
1678 If either stream returns [`None`], [`poll_next`] from the zipped stream will return
1679 [`None`]. If the first stream returns [`None`], `zip` will short-circuit and
1680 `poll_next` will not be called on the second stream.
1681
1682 [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1683 [`poll_next`]: #tymethod.poll_next
1684
1685 ## Examples
1686
1687 ```
1688 # fn main() { async_std::task::block_on(async {
1689 #
1690 use async_std::prelude::*;
1691 use async_std::stream;
1692
1693 let l = stream::from_iter(vec![1u8, 2, 3]);
1694 let r = stream::from_iter(vec![4u8, 5, 6, 7]);
1695 let mut s = l.zip(r);
1696
1697 assert_eq!(s.next().await, Some((1, 4)));
1698 assert_eq!(s.next().await, Some((2, 5)));
1699 assert_eq!(s.next().await, Some((3, 6)));
1700 assert_eq!(s.next().await, None);
1701 #
1702 # }) }
1703 ```
1704 "#]
1705 #[inline]
1706 fn zip<U>(self, other: U) -> Zip<Self, U>
1707 where
1708 Self: Sized,
1709 U: Stream,
1710 {
1711 Zip::new(self, other)
1712 }
1713
1714 #[doc = r#"
1715 Converts an stream of pairs into a pair of containers.
1716
1717 `unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements.
1718
1719 This function is, in some sense, the opposite of [`zip`].
1720
1721 [`zip`]: trait.Stream.html#method.zip
1722
1723 # Example
1724
1725 ```
1726 # fn main() { async_std::task::block_on(async {
1727 #
1728 use async_std::prelude::*;
1729 use async_std::stream;
1730
1731 let s = stream::from_iter(vec![(1,2), (3,4)]);
1732
1733 let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1734
1735 assert_eq!(left, [1, 3]);
1736 assert_eq!(right, [2, 4]);
1737 #
1738 # }) }
1739 ```
1740 "#]
1741 #[cfg(feature = "unstable")]
1742 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1743 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1744 where
1745 FromA: Default + Extend<A>,
1746 FromB: Default + Extend<B>,
1747 Self: Stream<Item = (A, B)> + Sized,
1748 {
1749 UnzipFuture::new(self)
1750 }
1751
1752 #[doc = r#"
1753 Transforms a stream into a collection.
1754
1755 `collect()` can take anything streamable, and turn it into a relevant
1756 collection. This is one of the more powerful methods in the async
1757 standard library, used in a variety of contexts.
1758
1759 The most basic pattern in which `collect()` is used is to turn one
1760 collection into another. You take a collection, call [`into_stream`] on it,
1761 do a bunch of transformations, and then `collect()` at the end.
1762
1763 Because `collect()` is so general, it can cause problems with type
1764 inference. As such, `collect()` is one of the few times you'll see
1765 the syntax affectionately known as the 'turbofish': `::<>`. This
1766 helps the inference algorithm understand specifically which collection
1767 you're trying to collect into.
1768
1769 # Examples
1770
1771 ```
1772 # fn main() { async_std::task::block_on(async {
1773 #
1774 use async_std::prelude::*;
1775 use async_std::stream;
1776
1777 let s = stream::repeat(9u8).take(3);
1778 let buf: Vec<u8> = s.collect().await;
1779
1780 assert_eq!(buf, vec![9; 3]);
1781
1782 // You can also collect streams of Result values
1783 // into any collection that implements FromStream
1784 let s = stream::repeat(Ok(9)).take(3);
1785 // We are using Vec here, but other collections
1786 // are supported as well
1787 let buf: Result<Vec<u8>, ()> = s.collect().await;
1788
1789 assert_eq!(buf, Ok(vec![9; 3]));
1790
1791 // The stream will stop on the first Err and
1792 // return that instead
1793 let s = stream::repeat(Err(5)).take(3);
1794 let buf: Result<Vec<u8>, u8> = s.collect().await;
1795
1796 assert_eq!(buf, Err(5));
1797 #
1798 # }) }
1799 ```
1800
1801 [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
1802 "#]
1803 #[cfg(feature = "unstable")]
1804 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1805 fn collect<'a, B>(
1806 self,
1807 ) -> Pin<Box<dyn Future<Output = B> + 'a + Send>>
1808 where
1809 Self: Sized + 'a + Send,
1810 B: FromStream<Self::Item>,
1811 Self::Item: Send,
1812 {
1813 FromStream::from_stream(self)
1814 }
1815
1816 #[doc = r#"
1817 Combines multiple streams into a single stream of all their outputs.
1818
1819 Items are yielded as soon as they're received, and the stream continues yield until
1820 both streams have been exhausted. The output ordering between streams is not guaranteed.
1821
1822 # Examples
1823
1824 ```
1825 # async_std::task::block_on(async {
1826 use async_std::prelude::*;
1827 use async_std::stream::{self, FromStream};
1828
1829 let a = stream::once(1u8);
1830 let b = stream::once(2u8);
1831 let c = stream::once(3u8);
1832
1833 let s = a.merge(b).merge(c);
1834 let mut lst = Vec::from_stream(s).await;
1835
1836 lst.sort_unstable();
1837 assert_eq!(&lst, &[1u8, 2u8, 3u8]);
1838 # });
1839 ```
1840 "#]
1841 #[cfg(feature = "unstable")]
1842 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1843 fn merge<U>(self, other: U) -> Merge<Self, U>
1844 where
1845 Self: Sized,
1846 U: Stream<Item = Self::Item> + Sized,
1847 {
1848 Merge::new(self, other)
1849 }
1850
1851 #[doc = r#"
1852 Lexicographically compares the elements of this `Stream` with those
1853 of another.
1854
1855 # Examples
1856
1857 ```
1858 # fn main() { async_std::task::block_on(async {
1859 #
1860 use async_std::prelude::*;
1861 use async_std::stream;
1862
1863 use std::cmp::Ordering;
1864
1865 let s1 = stream::from_iter(vec![1]);
1866 let s2 = stream::from_iter(vec![1, 2]);
1867 let s3 = stream::from_iter(vec![1, 2, 3]);
1868 let s4 = stream::from_iter(vec![1, 2, 4]);
1869 assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal));
1870 assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less));
1871 assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater));
1872 assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less));
1873 assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater));
1874 #
1875 # }) }
1876 ```
1877 "#]
1878 fn partial_cmp<S>(
1879 self,
1880 other: S
1881 ) -> PartialCmpFuture<Self, S>
1882 where
1883 Self: Sized + Stream,
1884 S: Stream,
1885 <Self as Stream>::Item: PartialOrd<S::Item>,
1886 {
1887 PartialCmpFuture::new(self, other)
1888 }
1889
1890 #[doc = r#"
1891 Searches for an element in a Stream that satisfies a predicate, returning
1892 its index.
1893
1894 # Examples
1895
1896 ```
1897 # fn main() { async_std::task::block_on(async {
1898 #
1899 use async_std::prelude::*;
1900 use async_std::stream;
1901
1902 let s = stream::from_iter(vec![1usize, 2, 3]);
1903 let res = s.clone().position(|x| x == 1).await;
1904 assert_eq!(res, Some(0));
1905
1906 let res = s.clone().position(|x| x == 2).await;
1907 assert_eq!(res, Some(1));
1908
1909 let res = s.clone().position(|x| x == 3).await;
1910 assert_eq!(res, Some(2));
1911
1912 let res = s.clone().position(|x| x == 4).await;
1913 assert_eq!(res, None);
1914 #
1915 # }) }
1916 ```
1917 "#]
1918 fn position<P>(
1919 &mut self,
1920 predicate: P,
1921 ) -> PositionFuture<'_, Self, P>
1922 where
1923 Self: Unpin + Sized,
1924 P: FnMut(Self::Item) -> bool,
1925 {
1926 PositionFuture::new(self, predicate)
1927 }
1928
1929 #[doc = r#"
1930 Lexicographically compares the elements of this `Stream` with those
1931 of another using 'Ord'.
1932
1933 # Examples
1934
1935 ```
1936 # fn main() { async_std::task::block_on(async {
1937 #
1938 use async_std::prelude::*;
1939 use async_std::stream;
1940 use std::cmp::Ordering;
1941
1942 let s1 = stream::from_iter(vec![1]);
1943 let s2 = stream::from_iter(vec![1, 2]);
1944 let s3 = stream::from_iter(vec![1, 2, 3]);
1945 let s4 = stream::from_iter(vec![1, 2, 4]);
1946
1947 assert_eq!(s1.clone().cmp(s1.clone()).await, Ordering::Equal);
1948 assert_eq!(s1.clone().cmp(s2.clone()).await, Ordering::Less);
1949 assert_eq!(s2.clone().cmp(s1.clone()).await, Ordering::Greater);
1950 assert_eq!(s3.clone().cmp(s4.clone()).await, Ordering::Less);
1951 assert_eq!(s4.clone().cmp(s3.clone()).await, Ordering::Greater);
1952 #
1953 # }) }
1954 ```
1955 "#]
1956 fn cmp<S>(
1957 self,
1958 other: S
1959 ) -> CmpFuture<Self, S>
1960 where
1961 Self: Sized + Stream,
1962 S: Stream,
1963 <Self as Stream>::Item: Ord
1964 {
1965 CmpFuture::new(self, other)
1966 }
1967
1968 #[doc = r#"
1969 Counts the number of elements in the stream.
1970
1971 # Examples
1972
1973 ```
1974 # fn main() { async_std::task::block_on(async {
1975 #
1976 use async_std::prelude::*;
1977 use async_std::stream;
1978
1979 let s1 = stream::from_iter(vec![0]);
1980 let s2 = stream::from_iter(vec![1, 2, 3]);
1981
1982 assert_eq!(s1.count().await, 1);
1983 assert_eq!(s2.count().await, 3);
1984 #
1985 # }) }
1986 ```
1987 "#]
1988 #[cfg(feature = "unstable")]
1989 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1990 fn count(self) -> CountFuture<Self>
1991 where
1992 Self: Sized,
1993 {
1994 CountFuture::new(self)
1995 }
1996
1997 #[doc = r#"
1998 Determines if the elements of this `Stream` are lexicographically
1999 not equal to those of another.
2000
2001 # Examples
2002
2003 ```
2004 # fn main() { async_std::task::block_on(async {
2005 #
2006 use async_std::prelude::*;
2007 use async_std::stream;
2008
2009 let single = stream::from_iter(vec![1usize]);
2010 let single_ne = stream::from_iter(vec![10usize]);
2011 let multi = stream::from_iter(vec![1usize,2]);
2012 let multi_ne = stream::from_iter(vec![1usize,5]);
2013
2014 assert_eq!(single.clone().ne(single.clone()).await, false);
2015 assert_eq!(single_ne.clone().ne(single.clone()).await, true);
2016 assert_eq!(multi.clone().ne(single_ne.clone()).await, true);
2017 assert_eq!(multi_ne.clone().ne(multi.clone()).await, true);
2018 #
2019 # }) }
2020 ```
2021 "#]
2022 fn ne<S>(
2023 self,
2024 other: S
2025 ) -> NeFuture<Self, S>
2026 where
2027 Self: Sized,
2028 S: Sized + Stream,
2029 <Self as Stream>::Item: PartialEq<S::Item>,
2030 {
2031 NeFuture::new(self, other)
2032 }
2033
2034 #[doc = r#"
2035 Determines if the elements of this `Stream` are lexicographically
2036 greater than or equal to those of another.
2037
2038 # Examples
2039
2040 ```
2041 # fn main() { async_std::task::block_on(async {
2042 #
2043 use async_std::prelude::*;
2044 use async_std::stream;
2045
2046 let single = stream::from_iter(vec![1]);
2047 let single_gt = stream::from_iter(vec![10]);
2048 let multi = stream::from_iter(vec![1,2]);
2049 let multi_gt = stream::from_iter(vec![1,5]);
2050
2051 assert_eq!(single.clone().ge(single.clone()).await, true);
2052 assert_eq!(single_gt.clone().ge(single.clone()).await, true);
2053 assert_eq!(multi.clone().ge(single_gt.clone()).await, false);
2054 assert_eq!(multi_gt.clone().ge(multi.clone()).await, true);
2055 #
2056 # }) }
2057 ```
2058 "#]
2059 fn ge<S>(
2060 self,
2061 other: S
2062 ) -> GeFuture<Self, S>
2063 where
2064 Self: Sized + Stream,
2065 S: Stream,
2066 <Self as Stream>::Item: PartialOrd<S::Item>,
2067 {
2068 GeFuture::new(self, other)
2069 }
2070
2071 #[doc = r#"
2072 Determines if the elements of this `Stream` are lexicographically
2073 equal to those of another.
2074
2075 # Examples
2076
2077 ```
2078 # fn main() { async_std::task::block_on(async {
2079 #
2080 use async_std::prelude::*;
2081 use async_std::stream;
2082
2083 let single = stream::from_iter(vec![1]);
2084 let single_eq = stream::from_iter(vec![10]);
2085 let multi = stream::from_iter(vec![1,2]);
2086 let multi_eq = stream::from_iter(vec![1,5]);
2087
2088 assert_eq!(single.clone().eq(single.clone()).await, true);
2089 assert_eq!(single_eq.clone().eq(single.clone()).await, false);
2090 assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
2091 assert_eq!(multi_eq.clone().eq(multi.clone()).await, false);
2092 #
2093 # }) }
2094 ```
2095 "#]
2096 fn eq<S>(
2097 self,
2098 other: S
2099 ) -> EqFuture<Self, S>
2100 where
2101 Self: Sized + Stream,
2102 S: Sized + Stream,
2103 <Self as Stream>::Item: PartialEq<S::Item>,
2104 {
2105 EqFuture::new(self, other)
2106 }
2107
2108 #[doc = r#"
2109 Determines if the elements of this `Stream` are lexicographically
2110 greater than those of another.
2111
2112 # Examples
2113
2114 ```
2115 # fn main() { async_std::task::block_on(async {
2116 #
2117 use async_std::prelude::*;
2118 use async_std::stream;
2119
2120 let single = stream::from_iter(vec![1]);
2121 let single_gt = stream::from_iter(vec![10]);
2122 let multi = stream::from_iter(vec![1,2]);
2123 let multi_gt = stream::from_iter(vec![1,5]);
2124
2125 assert_eq!(single.clone().gt(single.clone()).await, false);
2126 assert_eq!(single_gt.clone().gt(single.clone()).await, true);
2127 assert_eq!(multi.clone().gt(single_gt.clone()).await, false);
2128 assert_eq!(multi_gt.clone().gt(multi.clone()).await, true);
2129 #
2130 # }) }
2131 ```
2132 "#]
2133 fn gt<S>(
2134 self,
2135 other: S
2136 ) -> GtFuture<Self, S>
2137 where
2138 Self: Sized + Stream,
2139 S: Stream,
2140 <Self as Stream>::Item: PartialOrd<S::Item>,
2141 {
2142 GtFuture::new(self, other)
2143 }
2144
2145 #[doc = r#"
2146 Determines if the elements of this `Stream` are lexicographically
2147 less or equal to those of another.
2148
2149 # Examples
2150
2151 ```
2152 # fn main() { async_std::task::block_on(async {
2153 #
2154 use async_std::prelude::*;
2155 use async_std::stream;
2156
2157 let single = stream::from_iter(vec![1]);
2158 let single_gt = stream::from_iter(vec![10]);
2159 let multi = stream::from_iter(vec![1,2]);
2160 let multi_gt = stream::from_iter(vec![1,5]);
2161
2162 assert_eq!(single.clone().le(single.clone()).await, true);
2163 assert_eq!(single.clone().le(single_gt.clone()).await, true);
2164 assert_eq!(multi.clone().le(single_gt.clone()).await, true);
2165 assert_eq!(multi_gt.clone().le(multi.clone()).await, false);
2166 #
2167 # }) }
2168 ```
2169 "#]
2170 fn le<S>(
2171 self,
2172 other: S
2173 ) -> LeFuture<Self, S>
2174 where
2175 Self: Sized + Stream,
2176 S: Stream,
2177 <Self as Stream>::Item: PartialOrd<S::Item>,
2178 {
2179 LeFuture::new(self, other)
2180 }
2181
2182 #[doc = r#"
2183 Determines if the elements of this `Stream` are lexicographically
2184 less than those of another.
2185
2186 # Examples
2187
2188 ```
2189 # fn main() { async_std::task::block_on(async {
2190 #
2191 use async_std::prelude::*;
2192 use async_std::stream;
2193
2194 let single = stream::from_iter(vec![1]);
2195 let single_gt = stream::from_iter(vec![10]);
2196 let multi = stream::from_iter(vec![1,2]);
2197 let multi_gt = stream::from_iter(vec![1,5]);
2198
2199 assert_eq!(single.clone().lt(single.clone()).await, false);
2200 assert_eq!(single.clone().lt(single_gt.clone()).await, true);
2201 assert_eq!(multi.clone().lt(single_gt.clone()).await, true);
2202 assert_eq!(multi_gt.clone().lt(multi.clone()).await, false);
2203 #
2204 # }) }
2205 ```
2206 "#]
2207 fn lt<S>(
2208 self,
2209 other: S
2210 ) -> LtFuture<Self, S>
2211 where
2212 Self: Sized + Stream,
2213 S: Stream,
2214 <Self as Stream>::Item: PartialOrd<S::Item>,
2215 {
2216 LtFuture::new(self, other)
2217 }
2218
2219 #[doc = r#"
2220 Sums the elements of a stream.
2221
2222 Takes each element, adds them together, and returns the result.
2223
2224 An empty streams returns the zero value of the type.
2225
2226 # Panics
2227
2228 When calling `sum()` and a primitive integer type is being returned, this
2229 method will panic if the computation overflows and debug assertions are
2230 enabled.
2231
2232 # Examples
2233
2234 Basic usage:
2235
2236 ```
2237 # fn main() { async_std::task::block_on(async {
2238 #
2239 use async_std::prelude::*;
2240 use async_std::stream;
2241
2242 let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
2243 let sum: u8 = s.sum().await;
2244
2245 assert_eq!(sum, 10);
2246 #
2247 # }) }
2248 ```
2249 "#]
2250 #[cfg(feature = "unstable")]
2251 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2252 fn sum<'a, S>(
2253 self,
2254 ) -> Pin<Box<dyn Future<Output = S> + 'a>>
2255 where
2256 Self: Sized + Stream<Item = S> + 'a,
2257 S: Sum<Self::Item>,
2258 {
2259 Sum::sum(self)
2260 }
2261
2262 #[doc = r#"
2263 Multiplies all elements of the stream.
2264
2265 An empty stream returns the one value of the type.
2266
2267 # Panics
2268
2269 When calling `product()` and a primitive integer type is being returned,
2270 method will panic if the computation overflows and debug assertions are
2271 enabled.
2272
2273 # Examples
2274
2275 This example calculates the factorial of n (i.e. the product of the numbers from 1 to
2276 n, inclusive):
2277
2278 ```
2279 # fn main() { async_std::task::block_on(async {
2280 #
2281 async fn factorial(n: u32) -> u32 {
2282 use async_std::prelude::*;
2283 use async_std::stream;
2284
2285 let s = stream::from_iter(1..=n);
2286 s.product().await
2287 }
2288
2289 assert_eq!(factorial(0).await, 1);
2290 assert_eq!(factorial(1).await, 1);
2291 assert_eq!(factorial(5).await, 120);
2292 #
2293 # }) }
2294 ```
2295 "#]
2296 #[cfg(feature = "unstable")]
2297 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2298 fn product<'a, P>(
2299 self,
2300 ) -> Pin<Box<dyn Future<Output = P> + 'a>>
2301 where
2302 Self: Sized + Stream<Item = P> + 'a,
2303 P: Product,
2304 {
2305 Product::product(self)
2306 }
2307}
2308
2309impl<T: Stream + ?Sized> StreamExt for T {}
2310