futures_util/stream/
mod.rs

1//! Streams
2//!
3//! This module contains a number of functions for working with `Stream`s,
4//! including the `StreamExt` trait which adds methods to `Stream` types.
5
6use futures_core::{IntoFuture, Stream};
7use futures_sink::Sink;
8use super::future::Either;
9
10mod iter_ok;
11pub use self::iter_ok::{iter_ok, IterOk};
12mod iter_result;
13pub use self::iter_result::{iter_result, IterResult};
14
15mod repeat;
16pub use self::repeat::{repeat, Repeat};
17
18mod and_then;
19mod chain;
20mod concat;
21mod empty;
22mod filter;
23mod filter_map;
24mod flatten;
25mod fold;
26mod for_each;
27mod err_into;
28mod fuse;
29mod future;
30mod inspect;
31mod inspect_err;
32mod map;
33mod map_err;
34mod once;
35mod or_else;
36mod peek;
37mod poll_fn;
38mod select;
39mod skip;
40mod skip_while;
41mod take;
42mod take_while;
43mod then;
44mod unfold;
45mod zip;
46mod forward;
47mod recover;
48pub use self::and_then::AndThen;
49pub use self::chain::Chain;
50pub use self::concat::Concat;
51pub use self::empty::{Empty, empty};
52pub use self::filter::Filter;
53pub use self::filter_map::FilterMap;
54pub use self::flatten::Flatten;
55pub use self::fold::Fold;
56pub use self::for_each::ForEach;
57pub use self::err_into::ErrInto;
58pub use self::fuse::Fuse;
59pub use self::future::StreamFuture;
60pub use self::inspect::Inspect;
61pub use self::inspect_err::InspectErr;
62pub use self::map::Map;
63pub use self::map_err::MapErr;
64pub use self::once::{Once, once};
65pub use self::or_else::OrElse;
66pub use self::peek::Peekable;
67pub use self::poll_fn::{poll_fn, PollFn};
68pub use self::select::Select;
69pub use self::skip::Skip;
70pub use self::skip_while::SkipWhile;
71pub use self::take::Take;
72pub use self::take_while::TakeWhile;
73pub use self::then::Then;
74pub use self::unfold::{Unfold, unfold};
75pub use self::zip::Zip;
76pub use self::forward::Forward;
77pub use self::recover::Recover;
78
79if_std! {
80    use std;
81    use std::iter::Extend;
82
83    mod buffered;
84    mod buffer_unordered;
85    mod catch_unwind;
86    mod chunks;
87    mod collect;
88    mod for_each_concurrent;
89    mod select_all;
90    mod split;
91    mod futures_unordered;
92    mod futures_ordered;
93    pub use self::buffered::Buffered;
94    pub use self::buffer_unordered::BufferUnordered;
95    pub use self::catch_unwind::CatchUnwind;
96    pub use self::chunks::Chunks;
97    pub use self::collect::Collect;
98    pub use self::select_all::{select_all, SelectAll};
99    pub use self::split::{SplitStream, SplitSink, ReuniteError};
100    pub use self::for_each_concurrent::ForEachConcurrent;
101    pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
102    pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
103}
104
105impl<T: ?Sized> StreamExt for T where T: Stream {}
106
107/// An extension trait for `Stream`s that provides a variety of convenient
108/// combinator functions.
109pub trait StreamExt: Stream {
110    /// Converts this stream into a `Future`.
111    ///
112    /// A stream can be viewed as a future which will resolve to a pair containing
113    /// the next element of the stream plus the remaining stream. If the stream
114    /// terminates, then the next element is `None` and the remaining stream is
115    /// still passed back, to allow reclamation of its resources.
116    ///
117    /// The returned future can be used to compose streams and futures together by
118    /// placing everything into the "world of futures".
119    fn next(self) -> StreamFuture<Self>
120        where Self: Sized
121    {
122        future::new(self)
123    }
124
125    /// Converts a stream of type `T` to a stream of type `U`.
126    ///
127    /// The provided closure is executed over all elements of this stream as
128    /// they are made available, and the callback will be executed inline with
129    /// calls to `poll`.
130    ///
131    /// Note that this function consumes the receiving stream and returns a
132    /// wrapped version of it, similar to the existing `map` methods in the
133    /// standard library.
134    ///
135    /// # Examples
136    ///
137    /// ```
138    /// # extern crate futures;
139    /// # extern crate futures_channel;
140    /// use futures::prelude::*;
141    /// use futures_channel::mpsc;
142    ///
143    /// # fn main() {
144    /// let (_tx, rx) = mpsc::channel::<i32>(1);
145    /// let rx = rx.map(|x| x + 3);
146    /// # }
147    /// ```
148    fn map<U, F>(self, f: F) -> Map<Self, F>
149        where F: FnMut(Self::Item) -> U,
150              Self: Sized
151    {
152        map::new(self, f)
153    }
154
155    /// Converts a stream of error type `T` to a stream of error type `U`.
156    ///
157    /// The provided closure is executed over all errors of this stream as
158    /// they are made available, and the callback will be executed inline with
159    /// calls to `poll`.
160    ///
161    /// Note that this function consumes the receiving stream and returns a
162    /// wrapped version of it, similar to the existing `map_err` methods in the
163    /// standard library.
164    ///
165    /// # Examples
166    ///
167    /// ```
168    /// # extern crate futures;
169    /// # extern crate futures_channel;
170    /// use futures::prelude::*;
171    /// use futures_channel::mpsc;
172    ///
173    /// # fn main() {
174    /// let (_tx, rx) = mpsc::channel::<i32>(1);
175    /// let rx = rx.map_err(|_| 3);
176    /// # }
177    /// ```
178    fn map_err<U, F>(self, f: F) -> MapErr<Self, F>
179        where F: FnMut(Self::Error) -> U,
180              Self: Sized
181    {
182        map_err::new(self, f)
183    }
184
185    /// Filters the values produced by this stream according to the provided
186    /// predicate.
187    ///
188    /// As values of this stream are made available, the provided predicate will
189    /// be run against them. If the predicate returns a `Future` which resolves
190    /// to `true`, then the stream will yield the value, but if the predicate
191    /// returns a `Future` which resolves to `false`, then the  value will be
192    /// discarded and the next value will be produced.
193    ///
194    /// All errors are passed through without filtering in this combinator.
195    ///
196    /// Note that this function consumes the receiving stream and returns a
197    /// wrapped version of it, similar to the existing `filter` methods in the
198    /// standard library.
199    ///
200    /// # Examples
201    ///
202    /// ```
203    /// # extern crate futures;
204    /// # extern crate futures_channel;
205    /// use futures::prelude::*;
206    /// use futures_channel::mpsc;
207    ///
208    /// # fn main() {
209    /// let (_tx, rx) = mpsc::channel::<i32>(1);
210    /// let evens = rx.filter(|x| Ok(x % 2 == 0));
211    /// # }
212    /// ```
213    fn filter<R, P>(self, pred: P) -> Filter<Self, R, P>
214        where P: FnMut(&Self::Item) -> R,
215              R: IntoFuture<Item=bool, Error=Self::Error>,
216              Self: Sized,
217    {
218        filter::new(self, pred)
219    }
220
221    /// Filters the values produced by this stream while simultaneously mapping
222    /// them to a different type.
223    ///
224    /// As values of this stream are made available, the provided function will
225    /// be run on them. If the predicate returns `Some(e)` then the stream will
226    /// yield the value `e`, but if the predicate returns `None` then the next
227    /// value will be produced.
228    ///
229    /// All errors are passed through without filtering in this combinator.
230    ///
231    /// Note that this function consumes the receiving stream and returns a
232    /// wrapped version of it, similar to the existing `filter_map` methods in the
233    /// standard library.
234    ///
235    /// # Examples
236    ///
237    /// ```
238    /// # extern crate futures;
239    /// # extern crate futures_channel;
240    /// use futures::prelude::*;
241    /// use futures_channel::mpsc;
242    ///
243    /// # fn main() {
244    /// let (_tx, rx) = mpsc::channel::<i32>(1);
245    /// let evens_plus_one = rx.filter_map(|x| {
246    ///     Ok(
247    ///         if x % 0 == 2 {
248    ///             Some(x + 1)
249    ///         } else {
250    ///             None
251    ///         }
252    ///     )
253    /// });
254    /// # }
255    /// ```
256    fn filter_map<R, B, F>(self, f: F) -> FilterMap<Self, R, F>
257        where F: FnMut(Self::Item) -> R,
258              R: IntoFuture<Item=Option<B>, Error=Self::Error>,
259              Self: Sized,
260    {
261        filter_map::new(self, f)
262    }
263
264    /// Chain on a computation for when a value is ready, passing the resulting
265    /// item to the provided closure `f`.
266    ///
267    /// This function can be used to ensure a computation runs regardless of
268    /// the next value on the stream. The closure provided will be yielded a
269    /// `Result` once a value is ready, and the returned future will then be run
270    /// to completion to produce the next value on this stream.
271    ///
272    /// The returned value of the closure must implement the `IntoFuture` trait
273    /// and can represent some more work to be done before the composed stream
274    /// is finished. Note that the `Result` type implements the `IntoFuture`
275    /// trait so it is possible to simply alter the `Result` yielded to the
276    /// closure and return it.
277    ///
278    /// Note that this function consumes the receiving stream and returns a
279    /// wrapped version of it.
280    ///
281    /// # Examples
282    ///
283    /// ```
284    /// # extern crate futures;
285    /// # extern crate futures_channel;
286    /// use futures::prelude::*;
287    /// use futures_channel::mpsc;
288    ///
289    /// # fn main() {
290    /// let (_tx, rx) = mpsc::channel::<i32>(1);
291    ///
292    /// let rx = rx.then(|result| {
293    ///     match result {
294    ///         Ok(e) => Ok(e + 3),
295    ///         Err(_) => Err(4),
296    ///     }
297    /// });
298    /// # }
299    /// ```
300    fn then<U, F>(self, f: F) -> Then<Self, U, F>
301        where F: FnMut(Result<Self::Item, Self::Error>) -> U,
302              U: IntoFuture,
303              Self: Sized
304    {
305        then::new(self, f)
306    }
307
308    /// Chain on a computation for when a value is ready, passing the successful
309    /// results to the provided closure `f`.
310    ///
311    /// This function can be used to run a unit of work when the next successful
312    /// value on a stream is ready. The closure provided will be yielded a value
313    /// when ready, and the returned future will then be run to completion to
314    /// produce the next value on this stream.
315    ///
316    /// Any errors produced by this stream will not be passed to the closure,
317    /// and will be passed through.
318    ///
319    /// The returned value of the closure must implement the `IntoFuture` trait
320    /// and can represent some more work to be done before the composed stream
321    /// is finished. Note that the `Result` type implements the `IntoFuture`
322    /// trait so it is possible to simply alter the `Result` yielded to the
323    /// closure and return it.
324    ///
325    /// Note that this function consumes the receiving stream and returns a
326    /// wrapped version of it.
327    ///
328    /// To process the entire stream and return a single future representing
329    /// success or error, use `for_each` instead.
330    ///
331    /// # Examples
332    ///
333    /// ```
334    /// # extern crate futures;
335    /// # extern crate futures_channel;
336    /// use futures::prelude::*;
337    /// use futures_channel::mpsc;
338    ///
339    /// # fn main() {
340    /// let (_tx, rx) = mpsc::channel::<i32>(1);
341    ///
342    /// let rx = rx.and_then(|result| {
343    ///     if result % 2 == 0 {
344    ///         Ok(Some(result))
345    ///     } else {
346    ///         Ok(None)
347    ///     }
348    /// });
349    /// # }
350    /// ```
351    fn and_then<U, F>(self, f: F) -> AndThen<Self, U, F>
352        where F: FnMut(Self::Item) -> U,
353              U: IntoFuture<Error = Self::Error>,
354              Self: Sized
355    {
356        and_then::new(self, f)
357    }
358
359    /// Chain on a computation for when an error happens, passing the
360    /// erroneous result to the provided closure `f`.
361    ///
362    /// This function can be used to run a unit of work and attempt to recover from
363    /// an error if one happens. The closure provided will be yielded an error
364    /// when one appears, and the returned future will then be run to completion
365    /// to produce the next value on this stream.
366    ///
367    /// Any successful values produced by this stream will not be passed to the
368    /// closure, and will be passed through.
369    ///
370    /// The returned value of the closure must implement the `IntoFuture` trait
371    /// and can represent some more work to be done before the composed stream
372    /// is finished. Note that the `Result` type implements the `IntoFuture`
373    /// trait so it is possible to simply alter the `Result` yielded to the
374    /// closure and return it.
375    ///
376    /// Note that this function consumes the receiving stream and returns a
377    /// wrapped version of it.
378    fn or_else<U, F>(self, f: F) -> OrElse<Self, U, F>
379        where F: FnMut(Self::Error) -> U,
380              U: IntoFuture<Item = Self::Item>,
381              Self: Sized
382    {
383        or_else::new(self, f)
384    }
385
386    /// Collect all of the values of this stream into a vector, returning a
387    /// future representing the result of that computation.
388    ///
389    /// This combinator will collect all successful results of this stream and
390    /// collect them into a `Vec<Self::Item>`. If an error happens then all
391    /// collected elements will be dropped and the error will be returned.
392    ///
393    /// The returned future will be resolved whenever an error happens or when
394    /// the stream returns `Ok(None)`.
395    ///
396    /// This method is only available when the `std` feature of this
397    /// library is activated, and it is activated by default.
398    ///
399    /// # Examples
400    ///
401    /// ```
402    /// # extern crate futures;
403    /// # extern crate futures_executor;
404    /// # extern crate futures_channel;
405    /// use std::thread;
406    ///
407    /// use futures::prelude::*;
408    /// use futures_channel::mpsc;
409    /// use futures_executor::block_on;
410    ///
411    /// # fn main() {
412    /// let (mut tx, rx) = mpsc::unbounded();
413    ///
414    /// thread::spawn(move || {
415    ///     for i in (0..5).rev() {
416    ///         tx.unbounded_send(i + 1).unwrap();
417    ///     }
418    /// });
419    ///
420    /// let result = block_on(rx.collect());
421    /// assert_eq!(result, Ok(vec![5, 4, 3, 2, 1]));
422    /// # }
423    /// ```
424    #[cfg(feature = "std")]
425    fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
426        where Self: Sized
427    {
428        collect::new(self)
429    }
430
431    /// Concatenate all results of a stream into a single extendable
432    /// destination, returning a future representing the end result.
433    ///
434    /// This combinator will extend the first item with the contents
435    /// of all the successful results of the stream. If the stream is
436    /// empty, the default value will be returned. If an error occurs,
437    /// all the results will be dropped and the error will be returned.
438    ///
439    /// # Examples
440    ///
441    /// ```
442    /// # extern crate futures;
443    /// # extern crate futures_executor;
444    /// # extern crate futures_channel;
445    /// use std::thread;
446    ///
447    /// use futures::prelude::*;
448    /// use futures_channel::mpsc;
449    /// use futures_executor::block_on;
450    ///
451    /// # fn main() {
452    /// let (mut tx, rx) = mpsc::unbounded();
453    ///
454    /// thread::spawn(move || {
455    ///     for i in (0..3).rev() {
456    ///         let n = i * 3;
457    ///         tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
458    ///     }
459    /// });
460    /// let result = block_on(rx.concat());
461    /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
462    /// # }
463    /// ```
464    fn concat(self) -> Concat<Self>
465        where Self: Sized,
466              Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
467    {
468        concat::new(self)
469    }
470
471    /// Execute an accumulating computation over a stream, collecting all the
472    /// values into one final result.
473    ///
474    /// This combinator will collect all successful results of this stream
475    /// according to the closure provided. The initial state is also provided to
476    /// this method and then is returned again by each execution of the closure.
477    /// Once the entire stream has been exhausted the returned future will
478    /// resolve to this value.
479    ///
480    /// If an error happens then collected state will be dropped and the error
481    /// will be returned.
482    ///
483    /// # Examples
484    ///
485    /// ```
486    /// # extern crate futures;
487    /// # extern crate futures_executor;
488    /// use futures::prelude::*;
489    /// use futures::stream;
490    /// use futures::future;
491    /// use futures_executor::block_on;
492    ///
493    /// # fn main() {
494    /// let number_stream = stream::iter_ok::<_, ()>(0..6);
495    /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x));
496    /// assert_eq!(block_on(sum), Ok(15));
497    /// # }
498    /// ```
499    fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
500        where F: FnMut(T, Self::Item) -> Fut,
501              Fut: IntoFuture<Item = T, Error = Self::Error>,
502              Self: Sized
503    {
504        fold::new(self, f, init)
505    }
506
507    /// Flattens a stream of streams into just one continuous stream.
508    ///
509    /// If this stream's elements are themselves streams then this combinator
510    /// will flatten out the entire stream to one long chain of elements. Any
511    /// errors are passed through without looking at them, but otherwise each
512    /// individual stream will get exhausted before moving on to the next.
513    ///
514    /// ```
515    /// # extern crate futures;
516    /// # extern crate futures_channel;
517    /// # extern crate futures_executor;
518    /// use std::thread;
519    ///
520    /// use futures::prelude::*;
521    /// use futures_channel::mpsc;
522    /// use futures_executor::block_on;
523    ///
524    /// # fn main() {
525    /// let (tx1, rx1) = mpsc::unbounded::<i32>();
526    /// let (tx2, rx2) = mpsc::unbounded::<i32>();
527    /// let (tx3, rx3) = mpsc::unbounded();
528    ///
529    /// thread::spawn(move || {
530    ///     tx1.unbounded_send(1).unwrap();
531    ///     tx1.unbounded_send(2).unwrap();
532    /// });
533    /// thread::spawn(move || {
534    ///     tx2.unbounded_send(3).unwrap();
535    ///     tx2.unbounded_send(4).unwrap();
536    /// });
537    /// thread::spawn(move || {
538    ///     tx3.unbounded_send(rx1).unwrap();
539    ///     tx3.unbounded_send(rx2).unwrap();
540    /// });
541    ///
542    /// let result = block_on(rx3.flatten().collect());
543    /// assert_eq!(result, Ok(vec![1, 2, 3, 4]));
544    /// # }
545    /// ```
546    fn flatten(self) -> Flatten<Self>
547        where Self::Item: Stream<Error = Self::Error>,
548              Self: Sized
549    {
550        flatten::new(self)
551    }
552
553    /// Skip elements on this stream while the predicate provided resolves to
554    /// `true`.
555    ///
556    /// This function, like `Iterator::skip_while`, will skip elements on the
557    /// stream until the `predicate` resolves to `false`. Once one element
558    /// returns false all future elements will be returned from the underlying
559    /// stream.
560    fn skip_while<R, P>(self, pred: P) -> SkipWhile<Self, R, P>
561        where P: FnMut(&Self::Item) -> R,
562              R: IntoFuture<Item=bool, Error=Self::Error>,
563              Self: Sized
564    {
565        skip_while::new(self, pred)
566    }
567
568    /// Take elements from this stream while the predicate provided resolves to
569    /// `true`.
570    ///
571    /// This function, like `Iterator::take_while`, will take elements from the
572    /// stream until the `predicate` resolves to `false`. Once one element
573    /// returns false it will always return that the stream is done.
574    fn take_while<R, P>(self, pred: P) -> TakeWhile<Self, R, P>
575        where P: FnMut(&Self::Item) -> R,
576              R: IntoFuture<Item=bool, Error=Self::Error>,
577              Self: Sized
578    {
579        take_while::new(self, pred)
580    }
581
582    /// Runs this stream to completion, executing the provided closure for each
583    /// element on the stream.
584    ///
585    /// The closure provided will be called for each item this stream resolves
586    /// to successfully, producing a future. That future will then be executed
587    /// to completion before moving on to the next item.
588    ///
589    /// The returned value is a `Future` where the `Item` type is the completed
590    /// stream, and errors are otherwise threaded through. Any error on the
591    /// stream or in the provided future will cause iteration to be halted
592    /// immediately and the future will resolve to that error.
593    ///
594    /// To process each item in the stream and produce another stream instead
595    /// of a single future, use `and_then` instead.
596    fn for_each<U, F>(self, f: F) -> ForEach<Self, U, F>
597        where F: FnMut(Self::Item) -> U,
598              U: IntoFuture<Item=(), Error = Self::Error>,
599              Self: Sized
600    {
601        for_each::new(self, f)
602    }
603
604    /// Runs this stream to completion, executing the provided closure for each
605    /// element on the stream. This is similar to `for_each` but may begin
606    /// processing an element while previous elements are still being processed.
607    ///
608    /// When this stream successfully resolves to an item, the closure will be
609    /// called to produce a future. That future will then be added to
610    /// the set of futures to resolve.
611    ///
612    /// The returned value is a `Future` where the `Item` type is the completed
613    /// stream, and errors are otherwise threaded through. Any error on the
614    /// stream or in the provided future will cause iteration to be halted
615    /// immediately and the future will resolve to that error.
616    ///
617    /// To process each item in the stream and produce another stream instead
618    /// of a single future, use `and_then` instead.
619    #[cfg(feature = "std")]
620    fn for_each_concurrent<U, F>(self, f: F) -> ForEachConcurrent<Self, U, F>
621        where F: FnMut(Self::Item) -> U,
622              U: IntoFuture<Item=(), Error = Self::Error>,
623              Self: Sized
624    {
625        for_each_concurrent::new(self, f)
626    }
627
628    /// Map this stream's error to a different type using the `Into` trait.
629    ///
630    /// This function does for streams what `try!` does for `Result`,
631    /// by letting the compiler infer the type of the resulting error.
632    /// Just as `map_err` above, this is useful for example to ensure
633    /// that streams have the same error type when used with
634    /// combinators.
635    ///
636    /// Note that this function consumes the receiving stream and returns a
637    /// wrapped version of it.
638    fn err_into<E>(self) -> ErrInto<Self, E>
639        where Self: Sized,
640              Self::Error: Into<E>,
641    {
642        err_into::new(self)
643    }
644
645    /// Creates a new stream of at most `amt` items of the underlying stream.
646    ///
647    /// Once `amt` items have been yielded from this stream then it will always
648    /// return that the stream is done.
649    ///
650    /// # Errors
651    ///
652    /// Any errors yielded from underlying stream, before the desired amount of
653    /// items is reached, are passed through and do not affect the total number
654    /// of items taken.
655    fn take(self, amt: u64) -> Take<Self>
656        where Self: Sized
657    {
658        take::new(self, amt)
659    }
660
661    /// Creates a new stream which skips `amt` items of the underlying stream.
662    ///
663    /// Once `amt` items have been skipped from this stream then it will always
664    /// return the remaining items on this stream.
665    ///
666    /// # Errors
667    ///
668    /// All errors yielded from underlying stream are passed through and do not
669    /// affect the total number of items skipped.
670    fn skip(self, amt: u64) -> Skip<Self>
671        where Self: Sized
672    {
673        skip::new(self, amt)
674    }
675
676    /// Fuse a stream such that `poll` will never again be called once it has
677    /// finished.
678    ///
679    /// Currently once a stream has returned `None` from `poll` any further
680    /// calls could exhibit bad behavior such as block forever, panic, never
681    /// return, etc. If it is known that `poll` may be called after stream has
682    /// already finished, then this method can be used to ensure that it has
683    /// defined semantics.
684    ///
685    /// Once a stream has been `fuse`d and it finishes, then it will forever
686    /// return `None` from `poll`. This, unlike for the traits `poll` method,
687    /// is guaranteed.
688    ///
689    /// Also note that as soon as this stream returns `None` it will be dropped
690    /// to reclaim resources associated with it.
691    fn fuse(self) -> Fuse<Self>
692        where Self: Sized
693    {
694        fuse::new(self)
695    }
696
697    /// Borrows a stream, rather than consuming it.
698    ///
699    /// This is useful to allow applying stream adaptors while still retaining
700    /// ownership of the original stream.
701    ///
702    /// ```
703    /// # extern crate futures;
704    /// # extern crate futures_executor;
705    /// use futures::prelude::*;
706    /// use futures::stream;
707    /// use futures::future;
708    /// use futures_executor::block_on;
709    ///
710    /// # fn main() {
711    /// let mut stream = stream::iter_ok::<_, ()>(1..5);
712    ///
713    /// let sum = block_on(stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b)));
714    /// assert_eq!(sum, Ok(3));
715    ///
716    /// // You can use the stream again
717    /// let sum = block_on(stream.take(2).fold(0, |a, b| future::ok(a + b)));
718    /// assert_eq!(sum, Ok(7));
719    /// # }
720    /// ```
721    fn by_ref(&mut self) -> &mut Self
722        where Self: Sized
723    {
724        self
725    }
726
727    /// Catches unwinding panics while polling the stream.
728    ///
729    /// Caught panic (if any) will be the last element of the resulting stream.
730    ///
731    /// In general, panics within a stream can propagate all the way out to the
732    /// task level. This combinator makes it possible to halt unwinding within
733    /// the stream itself. It's most commonly used within task executors. This
734    /// method should not be used for error handling.
735    ///
736    /// Note that this method requires the `UnwindSafe` bound from the standard
737    /// library. This isn't always applied automatically, and the standard
738    /// library provides an `AssertUnwindSafe` wrapper type to apply it
739    /// after-the fact. To assist using this method, the `Stream` trait is also
740    /// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`.
741    ///
742    /// This method is only available when the `std` feature of this
743    /// library is activated, and it is activated by default.
744    ///
745    /// # Examples
746    ///
747    /// ```rust
748    /// # extern crate futures;
749    /// # extern crate futures_executor;
750    ///
751    /// use futures::prelude::*;
752    /// use futures::stream;
753    /// use futures_executor::block_on;
754    ///
755    /// # fn main() {
756    /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]);
757    /// // panic on second element
758    /// let stream_panicking = stream.map(|o| o.unwrap());
759    /// // collect all the results
760    /// let stream = stream_panicking.catch_unwind().then(|r| Ok::<_, ()>(r));
761    ///
762    /// let results: Vec<_> = block_on(stream.collect()).unwrap();
763    /// match results[0] {
764    ///     Ok(Ok(10)) => {}
765    ///     _ => panic!("unexpected result!"),
766    /// }
767    /// assert!(results[1].is_err());
768    /// assert_eq!(results.len(), 2);
769    /// # }
770    /// ```
771    #[cfg(feature = "std")]
772    fn catch_unwind(self) -> CatchUnwind<Self>
773        where Self: Sized + std::panic::UnwindSafe
774    {
775        catch_unwind::new(self)
776    }
777
778    /// An adaptor for creating a buffered list of pending futures.
779    ///
780    /// If this stream's item can be converted into a future, then this adaptor
781    /// will buffer up to at most `amt` futures and then return results in the
782    /// same order as the underlying stream. No more than `amt` futures will be
783    /// buffered at any point in time, and less than `amt` may also be buffered
784    /// depending on the state of each future.
785    ///
786    /// The returned stream will be a stream of each future's result, with
787    /// errors passed through whenever they occur.
788    ///
789    /// This method is only available when the `std` feature of this
790    /// library is activated, and it is activated by default.
791    #[cfg(feature = "std")]
792    fn buffered(self, amt: usize) -> Buffered<Self>
793        where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
794              Self: Sized
795    {
796        buffered::new(self, amt)
797    }
798
799    /// An adaptor for creating a buffered list of pending futures (unordered).
800    ///
801    /// If this stream's item can be converted into a future, then this adaptor
802    /// will buffer up to `amt` futures and then return results in the order
803    /// in which they complete. No more than `amt` futures will be buffered at
804    /// any point in time, and less than `amt` may also be buffered depending on
805    /// the state of each future.
806    ///
807    /// The returned stream will be a stream of each future's result, with
808    /// errors passed through whenever they occur.
809    ///
810    /// This method is only available when the `std` feature of this
811    /// library is activated, and it is activated by default.
812    #[cfg(feature = "std")]
813    fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self>
814        where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
815              Self: Sized
816    {
817        buffer_unordered::new(self, amt)
818    }
819
820    /// An adapter for zipping two streams together.
821    ///
822    /// The zipped stream waits for both streams to produce an item, and then
823    /// returns that pair. If an error happens, then that error will be returned
824    /// immediately. If either stream ends then the zipped stream will also end.
825    fn zip<S>(self, other: S) -> Zip<Self, S>
826        where S: Stream<Error = Self::Error>,
827              Self: Sized,
828    {
829        zip::new(self, other)
830    }
831
832    /// Adapter for chaining two stream.
833    ///
834    /// The resulting stream emits elements from the first stream, and when
835    /// first stream reaches the end, emits the elements from the second stream.
836    ///
837    /// ```rust
838    /// # extern crate futures;
839    /// # extern crate futures_executor;
840    /// use futures::prelude::*;
841    /// use futures::stream;
842    /// use futures_executor::block_on;
843    ///
844    /// # fn main() {
845    /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]);
846    /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]);
847    ///
848    /// let stream = stream1.chain(stream2)
849    ///     .then(|result| Ok::<_, ()>(result));
850    ///
851    /// let result: Vec<_> = block_on(stream.collect()).unwrap();
852    /// assert_eq!(result, vec![
853    ///     Ok(10),
854    ///     Err(false),
855    ///     Err(true),
856    ///     Ok(20),
857    /// ]);
858    /// # }
859    /// ```
860    fn chain<S>(self, other: S) -> Chain<Self, S>
861        where S: Stream<Item = Self::Item, Error = Self::Error>,
862              Self: Sized
863    {
864        chain::new(self, other)
865    }
866
867    /// Creates a new stream which exposes a `peek` method.
868    ///
869    /// Calling `peek` returns a reference to the next item in the stream.
870    fn peekable(self) -> Peekable<Self>
871        where Self: Sized
872    {
873        peek::new(self)
874    }
875
876    /// An adaptor for chunking up items of the stream inside a vector.
877    ///
878    /// This combinator will attempt to pull items from this stream and buffer
879    /// them into a local vector. At most `capacity` items will get buffered
880    /// before they're yielded from the returned stream.
881    ///
882    /// Note that the vectors returned from this iterator may not always have
883    /// `capacity` elements. If the underlying stream ended and only a partial
884    /// vector was created, it'll be returned. Additionally if an error happens
885    /// from the underlying stream then the currently buffered items will be
886    /// yielded.
887    ///
888    /// Errors are passed through the stream unbuffered.
889    ///
890    /// This method is only available when the `std` feature of this
891    /// library is activated, and it is activated by default.
892    ///
893    /// # Panics
894    ///
895    /// This method will panic of `capacity` is zero.
896    #[cfg(feature = "std")]
897    fn chunks(self, capacity: usize) -> Chunks<Self>
898        where Self: Sized
899    {
900        chunks::new(self, capacity)
901    }
902
903    /// Creates a stream that selects the next element from either this stream
904    /// or the provided one, whichever is ready first.
905    ///
906    /// This combinator will attempt to pull items from both streams. Each
907    /// stream will be polled in a round-robin fashion, and whenever a stream is
908    /// ready to yield an item that item is yielded.
909    ///
910    /// Error are passed through from either stream.
911    fn select<S>(self, other: S) -> Select<Self, S>
912        where S: Stream<Item = Self::Item, Error = Self::Error>,
913              Self: Sized,
914    {
915        select::new(self, other)
916    }
917
918    /// A future that completes after the given stream has been fully processed
919    /// into the sink, including flushing.
920    ///
921    /// This future will drive the stream to keep producing items until it is
922    /// exhausted, sending each item to the sink. It will complete once both the
923    /// stream is exhausted, and the sink has received and flushed all items.
924    /// Note that the sink is **not** closed.
925    ///
926    /// Doing `stream.forward(sink)` is roughly equivalent to
927    /// `sink.send_all(stream)`. The returned future will exhaust all items from
928    /// `self`, sending them all to `sink`.
929    ///
930    /// On completion, the pair `(stream, sink)` is returned.
931    fn forward<S>(self, sink: S) -> Forward<Self, S>
932        where S: Sink<SinkItem = Self::Item>,
933              Self::Error: From<S::SinkError>,
934              Self: Sized
935    {
936        forward::new(self, sink)
937    }
938
939    /// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
940    /// objects.
941    ///
942    /// This can be useful when you want to split ownership between tasks, or
943    /// allow direct interaction between the two objects (e.g. via
944    /// `Sink::send_all`).
945    ///
946    /// This method is only available when the `std` feature of this
947    /// library is activated, and it is activated by default.
948    #[cfg(feature = "std")]
949    fn split(self) -> (SplitSink<Self>, SplitStream<Self>)
950        where Self: Sink + Sized
951    {
952        split::split(self)
953    }
954
955    /// Do something with each item of this stream, afterwards passing it on.
956    ///
957    /// This is similar to the `Iterator::inspect` method in the standard
958    /// library where it allows easily inspecting each value as it passes
959    /// through the stream, for example to debug what's going on.
960    fn inspect<F>(self, f: F) -> Inspect<Self, F>
961        where F: FnMut(&Self::Item),
962              Self: Sized,
963    {
964        inspect::new(self, f)
965    }
966
967    /// Do something with the error of this stream, afterwards passing it on.
968    ///
969    /// This is similar to the `Stream::inspect` method where it allows
970    /// easily inspecting the error as it passes through the stream, for
971    /// example to debug what's going on.
972    fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
973        where F: FnMut(&Self::Error),
974              Self: Sized,
975    {
976        inspect_err::new(self, f)
977    }
978
979    /// Handle errors generated by this stream by converting them into
980    /// `Option<Self::Item>`, such that a `None` value terminates the stream.
981    ///
982    /// Because it can never produce an error, the returned `Recover` stream can
983    /// conform to any specific `Error` type, including `Never`.
984    fn recover<E, F>(self, f: F) -> Recover<Self, E, F>
985        where F: FnMut(Self::Error) -> Option<Self::Item>,
986              Self: Sized,
987    {
988        recover::new(self, f)
989    }
990
991    /// Wrap this stream in an `Either` stream, making it the left-hand variant
992    /// of that `Either`.
993    ///
994    /// This can be used in combination with the `right` method to write `if`
995    /// statements that evaluate to different streams in different branches.
996    #[deprecated(note = "use `left_stream` instead")]
997    fn left<B>(self) -> Either<Self, B>
998        where B: Stream<Item = Self::Item, Error = Self::Error>,
999              Self: Sized
1000    {
1001        Either::Left(self)
1002    }
1003
1004    /// Wrap this stream in an `Either` stream, making it the right-hand variant
1005    /// of that `Either`.
1006    ///
1007    /// This can be used in combination with the `left` method to write `if`
1008    /// statements that evaluate to different streams in different branches.
1009    #[deprecated(note = "use `right_stream` instead")]
1010    fn right<B>(self) -> Either<B, Self>
1011        where B: Stream<Item = Self::Item, Error = Self::Error>,
1012              Self: Sized
1013    {
1014        Either::Right(self)
1015    }
1016
1017    /// Wrap this stream in an `Either` stream, making it the left-hand variant
1018    /// of that `Either`.
1019    ///
1020    /// This can be used in combination with the `right_stream` method to write `if`
1021    /// statements that evaluate to different streams in different branches.
1022    fn left_stream<B>(self) -> Either<Self, B>
1023        where B: Stream<Item = Self::Item, Error = Self::Error>,
1024              Self: Sized
1025    {
1026        Either::Left(self)
1027    }
1028
1029    /// Wrap this stream in an `Either` stream, making it the right-hand variant
1030    /// of that `Either`.
1031    ///
1032    /// This can be used in combination with the `left_stream` method to write `if`
1033    /// statements that evaluate to different streams in different branches.
1034    fn right_stream<B>(self) -> Either<B, Self>
1035        where B: Stream<Item = Self::Item, Error = Self::Error>,
1036              Self: Sized
1037    {
1038        Either::Right(self)
1039    }
1040}