futures_util/stream/mod.rs
1//! Streams
2//!
3//! This module contains a number of functions for working with `Stream`s,
4//! including the `StreamExt` trait which adds methods to `Stream` types.
5
6use futures_core::{IntoFuture, Stream};
7use futures_sink::Sink;
8use super::future::Either;
9
10mod iter_ok;
11pub use self::iter_ok::{iter_ok, IterOk};
12mod iter_result;
13pub use self::iter_result::{iter_result, IterResult};
14
15mod repeat;
16pub use self::repeat::{repeat, Repeat};
17
18mod and_then;
19mod chain;
20mod concat;
21mod empty;
22mod filter;
23mod filter_map;
24mod flatten;
25mod fold;
26mod for_each;
27mod err_into;
28mod fuse;
29mod future;
30mod inspect;
31mod inspect_err;
32mod map;
33mod map_err;
34mod once;
35mod or_else;
36mod peek;
37mod poll_fn;
38mod select;
39mod skip;
40mod skip_while;
41mod take;
42mod take_while;
43mod then;
44mod unfold;
45mod zip;
46mod forward;
47mod recover;
48pub use self::and_then::AndThen;
49pub use self::chain::Chain;
50pub use self::concat::Concat;
51pub use self::empty::{Empty, empty};
52pub use self::filter::Filter;
53pub use self::filter_map::FilterMap;
54pub use self::flatten::Flatten;
55pub use self::fold::Fold;
56pub use self::for_each::ForEach;
57pub use self::err_into::ErrInto;
58pub use self::fuse::Fuse;
59pub use self::future::StreamFuture;
60pub use self::inspect::Inspect;
61pub use self::inspect_err::InspectErr;
62pub use self::map::Map;
63pub use self::map_err::MapErr;
64pub use self::once::{Once, once};
65pub use self::or_else::OrElse;
66pub use self::peek::Peekable;
67pub use self::poll_fn::{poll_fn, PollFn};
68pub use self::select::Select;
69pub use self::skip::Skip;
70pub use self::skip_while::SkipWhile;
71pub use self::take::Take;
72pub use self::take_while::TakeWhile;
73pub use self::then::Then;
74pub use self::unfold::{Unfold, unfold};
75pub use self::zip::Zip;
76pub use self::forward::Forward;
77pub use self::recover::Recover;
78
79if_std! {
80 use std;
81 use std::iter::Extend;
82
83 mod buffered;
84 mod buffer_unordered;
85 mod catch_unwind;
86 mod chunks;
87 mod collect;
88 mod for_each_concurrent;
89 mod select_all;
90 mod split;
91 mod futures_unordered;
92 mod futures_ordered;
93 pub use self::buffered::Buffered;
94 pub use self::buffer_unordered::BufferUnordered;
95 pub use self::catch_unwind::CatchUnwind;
96 pub use self::chunks::Chunks;
97 pub use self::collect::Collect;
98 pub use self::select_all::{select_all, SelectAll};
99 pub use self::split::{SplitStream, SplitSink, ReuniteError};
100 pub use self::for_each_concurrent::ForEachConcurrent;
101 pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
102 pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
103}
104
105impl<T: ?Sized> StreamExt for T where T: Stream {}
106
107/// An extension trait for `Stream`s that provides a variety of convenient
108/// combinator functions.
109pub trait StreamExt: Stream {
110 /// Converts this stream into a `Future`.
111 ///
112 /// A stream can be viewed as a future which will resolve to a pair containing
113 /// the next element of the stream plus the remaining stream. If the stream
114 /// terminates, then the next element is `None` and the remaining stream is
115 /// still passed back, to allow reclamation of its resources.
116 ///
117 /// The returned future can be used to compose streams and futures together by
118 /// placing everything into the "world of futures".
119 fn next(self) -> StreamFuture<Self>
120 where Self: Sized
121 {
122 future::new(self)
123 }
124
125 /// Converts a stream of type `T` to a stream of type `U`.
126 ///
127 /// The provided closure is executed over all elements of this stream as
128 /// they are made available, and the callback will be executed inline with
129 /// calls to `poll`.
130 ///
131 /// Note that this function consumes the receiving stream and returns a
132 /// wrapped version of it, similar to the existing `map` methods in the
133 /// standard library.
134 ///
135 /// # Examples
136 ///
137 /// ```
138 /// # extern crate futures;
139 /// # extern crate futures_channel;
140 /// use futures::prelude::*;
141 /// use futures_channel::mpsc;
142 ///
143 /// # fn main() {
144 /// let (_tx, rx) = mpsc::channel::<i32>(1);
145 /// let rx = rx.map(|x| x + 3);
146 /// # }
147 /// ```
148 fn map<U, F>(self, f: F) -> Map<Self, F>
149 where F: FnMut(Self::Item) -> U,
150 Self: Sized
151 {
152 map::new(self, f)
153 }
154
155 /// Converts a stream of error type `T` to a stream of error type `U`.
156 ///
157 /// The provided closure is executed over all errors of this stream as
158 /// they are made available, and the callback will be executed inline with
159 /// calls to `poll`.
160 ///
161 /// Note that this function consumes the receiving stream and returns a
162 /// wrapped version of it, similar to the existing `map_err` methods in the
163 /// standard library.
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// # extern crate futures;
169 /// # extern crate futures_channel;
170 /// use futures::prelude::*;
171 /// use futures_channel::mpsc;
172 ///
173 /// # fn main() {
174 /// let (_tx, rx) = mpsc::channel::<i32>(1);
175 /// let rx = rx.map_err(|_| 3);
176 /// # }
177 /// ```
178 fn map_err<U, F>(self, f: F) -> MapErr<Self, F>
179 where F: FnMut(Self::Error) -> U,
180 Self: Sized
181 {
182 map_err::new(self, f)
183 }
184
185 /// Filters the values produced by this stream according to the provided
186 /// predicate.
187 ///
188 /// As values of this stream are made available, the provided predicate will
189 /// be run against them. If the predicate returns a `Future` which resolves
190 /// to `true`, then the stream will yield the value, but if the predicate
191 /// returns a `Future` which resolves to `false`, then the value will be
192 /// discarded and the next value will be produced.
193 ///
194 /// All errors are passed through without filtering in this combinator.
195 ///
196 /// Note that this function consumes the receiving stream and returns a
197 /// wrapped version of it, similar to the existing `filter` methods in the
198 /// standard library.
199 ///
200 /// # Examples
201 ///
202 /// ```
203 /// # extern crate futures;
204 /// # extern crate futures_channel;
205 /// use futures::prelude::*;
206 /// use futures_channel::mpsc;
207 ///
208 /// # fn main() {
209 /// let (_tx, rx) = mpsc::channel::<i32>(1);
210 /// let evens = rx.filter(|x| Ok(x % 2 == 0));
211 /// # }
212 /// ```
213 fn filter<R, P>(self, pred: P) -> Filter<Self, R, P>
214 where P: FnMut(&Self::Item) -> R,
215 R: IntoFuture<Item=bool, Error=Self::Error>,
216 Self: Sized,
217 {
218 filter::new(self, pred)
219 }
220
221 /// Filters the values produced by this stream while simultaneously mapping
222 /// them to a different type.
223 ///
224 /// As values of this stream are made available, the provided function will
225 /// be run on them. If the predicate returns `Some(e)` then the stream will
226 /// yield the value `e`, but if the predicate returns `None` then the next
227 /// value will be produced.
228 ///
229 /// All errors are passed through without filtering in this combinator.
230 ///
231 /// Note that this function consumes the receiving stream and returns a
232 /// wrapped version of it, similar to the existing `filter_map` methods in the
233 /// standard library.
234 ///
235 /// # Examples
236 ///
237 /// ```
238 /// # extern crate futures;
239 /// # extern crate futures_channel;
240 /// use futures::prelude::*;
241 /// use futures_channel::mpsc;
242 ///
243 /// # fn main() {
244 /// let (_tx, rx) = mpsc::channel::<i32>(1);
245 /// let evens_plus_one = rx.filter_map(|x| {
246 /// Ok(
247 /// if x % 0 == 2 {
248 /// Some(x + 1)
249 /// } else {
250 /// None
251 /// }
252 /// )
253 /// });
254 /// # }
255 /// ```
256 fn filter_map<R, B, F>(self, f: F) -> FilterMap<Self, R, F>
257 where F: FnMut(Self::Item) -> R,
258 R: IntoFuture<Item=Option<B>, Error=Self::Error>,
259 Self: Sized,
260 {
261 filter_map::new(self, f)
262 }
263
264 /// Chain on a computation for when a value is ready, passing the resulting
265 /// item to the provided closure `f`.
266 ///
267 /// This function can be used to ensure a computation runs regardless of
268 /// the next value on the stream. The closure provided will be yielded a
269 /// `Result` once a value is ready, and the returned future will then be run
270 /// to completion to produce the next value on this stream.
271 ///
272 /// The returned value of the closure must implement the `IntoFuture` trait
273 /// and can represent some more work to be done before the composed stream
274 /// is finished. Note that the `Result` type implements the `IntoFuture`
275 /// trait so it is possible to simply alter the `Result` yielded to the
276 /// closure and return it.
277 ///
278 /// Note that this function consumes the receiving stream and returns a
279 /// wrapped version of it.
280 ///
281 /// # Examples
282 ///
283 /// ```
284 /// # extern crate futures;
285 /// # extern crate futures_channel;
286 /// use futures::prelude::*;
287 /// use futures_channel::mpsc;
288 ///
289 /// # fn main() {
290 /// let (_tx, rx) = mpsc::channel::<i32>(1);
291 ///
292 /// let rx = rx.then(|result| {
293 /// match result {
294 /// Ok(e) => Ok(e + 3),
295 /// Err(_) => Err(4),
296 /// }
297 /// });
298 /// # }
299 /// ```
300 fn then<U, F>(self, f: F) -> Then<Self, U, F>
301 where F: FnMut(Result<Self::Item, Self::Error>) -> U,
302 U: IntoFuture,
303 Self: Sized
304 {
305 then::new(self, f)
306 }
307
308 /// Chain on a computation for when a value is ready, passing the successful
309 /// results to the provided closure `f`.
310 ///
311 /// This function can be used to run a unit of work when the next successful
312 /// value on a stream is ready. The closure provided will be yielded a value
313 /// when ready, and the returned future will then be run to completion to
314 /// produce the next value on this stream.
315 ///
316 /// Any errors produced by this stream will not be passed to the closure,
317 /// and will be passed through.
318 ///
319 /// The returned value of the closure must implement the `IntoFuture` trait
320 /// and can represent some more work to be done before the composed stream
321 /// is finished. Note that the `Result` type implements the `IntoFuture`
322 /// trait so it is possible to simply alter the `Result` yielded to the
323 /// closure and return it.
324 ///
325 /// Note that this function consumes the receiving stream and returns a
326 /// wrapped version of it.
327 ///
328 /// To process the entire stream and return a single future representing
329 /// success or error, use `for_each` instead.
330 ///
331 /// # Examples
332 ///
333 /// ```
334 /// # extern crate futures;
335 /// # extern crate futures_channel;
336 /// use futures::prelude::*;
337 /// use futures_channel::mpsc;
338 ///
339 /// # fn main() {
340 /// let (_tx, rx) = mpsc::channel::<i32>(1);
341 ///
342 /// let rx = rx.and_then(|result| {
343 /// if result % 2 == 0 {
344 /// Ok(Some(result))
345 /// } else {
346 /// Ok(None)
347 /// }
348 /// });
349 /// # }
350 /// ```
351 fn and_then<U, F>(self, f: F) -> AndThen<Self, U, F>
352 where F: FnMut(Self::Item) -> U,
353 U: IntoFuture<Error = Self::Error>,
354 Self: Sized
355 {
356 and_then::new(self, f)
357 }
358
359 /// Chain on a computation for when an error happens, passing the
360 /// erroneous result to the provided closure `f`.
361 ///
362 /// This function can be used to run a unit of work and attempt to recover from
363 /// an error if one happens. The closure provided will be yielded an error
364 /// when one appears, and the returned future will then be run to completion
365 /// to produce the next value on this stream.
366 ///
367 /// Any successful values produced by this stream will not be passed to the
368 /// closure, and will be passed through.
369 ///
370 /// The returned value of the closure must implement the `IntoFuture` trait
371 /// and can represent some more work to be done before the composed stream
372 /// is finished. Note that the `Result` type implements the `IntoFuture`
373 /// trait so it is possible to simply alter the `Result` yielded to the
374 /// closure and return it.
375 ///
376 /// Note that this function consumes the receiving stream and returns a
377 /// wrapped version of it.
378 fn or_else<U, F>(self, f: F) -> OrElse<Self, U, F>
379 where F: FnMut(Self::Error) -> U,
380 U: IntoFuture<Item = Self::Item>,
381 Self: Sized
382 {
383 or_else::new(self, f)
384 }
385
386 /// Collect all of the values of this stream into a vector, returning a
387 /// future representing the result of that computation.
388 ///
389 /// This combinator will collect all successful results of this stream and
390 /// collect them into a `Vec<Self::Item>`. If an error happens then all
391 /// collected elements will be dropped and the error will be returned.
392 ///
393 /// The returned future will be resolved whenever an error happens or when
394 /// the stream returns `Ok(None)`.
395 ///
396 /// This method is only available when the `std` feature of this
397 /// library is activated, and it is activated by default.
398 ///
399 /// # Examples
400 ///
401 /// ```
402 /// # extern crate futures;
403 /// # extern crate futures_executor;
404 /// # extern crate futures_channel;
405 /// use std::thread;
406 ///
407 /// use futures::prelude::*;
408 /// use futures_channel::mpsc;
409 /// use futures_executor::block_on;
410 ///
411 /// # fn main() {
412 /// let (mut tx, rx) = mpsc::unbounded();
413 ///
414 /// thread::spawn(move || {
415 /// for i in (0..5).rev() {
416 /// tx.unbounded_send(i + 1).unwrap();
417 /// }
418 /// });
419 ///
420 /// let result = block_on(rx.collect());
421 /// assert_eq!(result, Ok(vec![5, 4, 3, 2, 1]));
422 /// # }
423 /// ```
424 #[cfg(feature = "std")]
425 fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
426 where Self: Sized
427 {
428 collect::new(self)
429 }
430
431 /// Concatenate all results of a stream into a single extendable
432 /// destination, returning a future representing the end result.
433 ///
434 /// This combinator will extend the first item with the contents
435 /// of all the successful results of the stream. If the stream is
436 /// empty, the default value will be returned. If an error occurs,
437 /// all the results will be dropped and the error will be returned.
438 ///
439 /// # Examples
440 ///
441 /// ```
442 /// # extern crate futures;
443 /// # extern crate futures_executor;
444 /// # extern crate futures_channel;
445 /// use std::thread;
446 ///
447 /// use futures::prelude::*;
448 /// use futures_channel::mpsc;
449 /// use futures_executor::block_on;
450 ///
451 /// # fn main() {
452 /// let (mut tx, rx) = mpsc::unbounded();
453 ///
454 /// thread::spawn(move || {
455 /// for i in (0..3).rev() {
456 /// let n = i * 3;
457 /// tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
458 /// }
459 /// });
460 /// let result = block_on(rx.concat());
461 /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
462 /// # }
463 /// ```
464 fn concat(self) -> Concat<Self>
465 where Self: Sized,
466 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
467 {
468 concat::new(self)
469 }
470
471 /// Execute an accumulating computation over a stream, collecting all the
472 /// values into one final result.
473 ///
474 /// This combinator will collect all successful results of this stream
475 /// according to the closure provided. The initial state is also provided to
476 /// this method and then is returned again by each execution of the closure.
477 /// Once the entire stream has been exhausted the returned future will
478 /// resolve to this value.
479 ///
480 /// If an error happens then collected state will be dropped and the error
481 /// will be returned.
482 ///
483 /// # Examples
484 ///
485 /// ```
486 /// # extern crate futures;
487 /// # extern crate futures_executor;
488 /// use futures::prelude::*;
489 /// use futures::stream;
490 /// use futures::future;
491 /// use futures_executor::block_on;
492 ///
493 /// # fn main() {
494 /// let number_stream = stream::iter_ok::<_, ()>(0..6);
495 /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x));
496 /// assert_eq!(block_on(sum), Ok(15));
497 /// # }
498 /// ```
499 fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
500 where F: FnMut(T, Self::Item) -> Fut,
501 Fut: IntoFuture<Item = T, Error = Self::Error>,
502 Self: Sized
503 {
504 fold::new(self, f, init)
505 }
506
507 /// Flattens a stream of streams into just one continuous stream.
508 ///
509 /// If this stream's elements are themselves streams then this combinator
510 /// will flatten out the entire stream to one long chain of elements. Any
511 /// errors are passed through without looking at them, but otherwise each
512 /// individual stream will get exhausted before moving on to the next.
513 ///
514 /// ```
515 /// # extern crate futures;
516 /// # extern crate futures_channel;
517 /// # extern crate futures_executor;
518 /// use std::thread;
519 ///
520 /// use futures::prelude::*;
521 /// use futures_channel::mpsc;
522 /// use futures_executor::block_on;
523 ///
524 /// # fn main() {
525 /// let (tx1, rx1) = mpsc::unbounded::<i32>();
526 /// let (tx2, rx2) = mpsc::unbounded::<i32>();
527 /// let (tx3, rx3) = mpsc::unbounded();
528 ///
529 /// thread::spawn(move || {
530 /// tx1.unbounded_send(1).unwrap();
531 /// tx1.unbounded_send(2).unwrap();
532 /// });
533 /// thread::spawn(move || {
534 /// tx2.unbounded_send(3).unwrap();
535 /// tx2.unbounded_send(4).unwrap();
536 /// });
537 /// thread::spawn(move || {
538 /// tx3.unbounded_send(rx1).unwrap();
539 /// tx3.unbounded_send(rx2).unwrap();
540 /// });
541 ///
542 /// let result = block_on(rx3.flatten().collect());
543 /// assert_eq!(result, Ok(vec![1, 2, 3, 4]));
544 /// # }
545 /// ```
546 fn flatten(self) -> Flatten<Self>
547 where Self::Item: Stream<Error = Self::Error>,
548 Self: Sized
549 {
550 flatten::new(self)
551 }
552
553 /// Skip elements on this stream while the predicate provided resolves to
554 /// `true`.
555 ///
556 /// This function, like `Iterator::skip_while`, will skip elements on the
557 /// stream until the `predicate` resolves to `false`. Once one element
558 /// returns false all future elements will be returned from the underlying
559 /// stream.
560 fn skip_while<R, P>(self, pred: P) -> SkipWhile<Self, R, P>
561 where P: FnMut(&Self::Item) -> R,
562 R: IntoFuture<Item=bool, Error=Self::Error>,
563 Self: Sized
564 {
565 skip_while::new(self, pred)
566 }
567
568 /// Take elements from this stream while the predicate provided resolves to
569 /// `true`.
570 ///
571 /// This function, like `Iterator::take_while`, will take elements from the
572 /// stream until the `predicate` resolves to `false`. Once one element
573 /// returns false it will always return that the stream is done.
574 fn take_while<R, P>(self, pred: P) -> TakeWhile<Self, R, P>
575 where P: FnMut(&Self::Item) -> R,
576 R: IntoFuture<Item=bool, Error=Self::Error>,
577 Self: Sized
578 {
579 take_while::new(self, pred)
580 }
581
582 /// Runs this stream to completion, executing the provided closure for each
583 /// element on the stream.
584 ///
585 /// The closure provided will be called for each item this stream resolves
586 /// to successfully, producing a future. That future will then be executed
587 /// to completion before moving on to the next item.
588 ///
589 /// The returned value is a `Future` where the `Item` type is the completed
590 /// stream, and errors are otherwise threaded through. Any error on the
591 /// stream or in the provided future will cause iteration to be halted
592 /// immediately and the future will resolve to that error.
593 ///
594 /// To process each item in the stream and produce another stream instead
595 /// of a single future, use `and_then` instead.
596 fn for_each<U, F>(self, f: F) -> ForEach<Self, U, F>
597 where F: FnMut(Self::Item) -> U,
598 U: IntoFuture<Item=(), Error = Self::Error>,
599 Self: Sized
600 {
601 for_each::new(self, f)
602 }
603
604 /// Runs this stream to completion, executing the provided closure for each
605 /// element on the stream. This is similar to `for_each` but may begin
606 /// processing an element while previous elements are still being processed.
607 ///
608 /// When this stream successfully resolves to an item, the closure will be
609 /// called to produce a future. That future will then be added to
610 /// the set of futures to resolve.
611 ///
612 /// The returned value is a `Future` where the `Item` type is the completed
613 /// stream, and errors are otherwise threaded through. Any error on the
614 /// stream or in the provided future will cause iteration to be halted
615 /// immediately and the future will resolve to that error.
616 ///
617 /// To process each item in the stream and produce another stream instead
618 /// of a single future, use `and_then` instead.
619 #[cfg(feature = "std")]
620 fn for_each_concurrent<U, F>(self, f: F) -> ForEachConcurrent<Self, U, F>
621 where F: FnMut(Self::Item) -> U,
622 U: IntoFuture<Item=(), Error = Self::Error>,
623 Self: Sized
624 {
625 for_each_concurrent::new(self, f)
626 }
627
628 /// Map this stream's error to a different type using the `Into` trait.
629 ///
630 /// This function does for streams what `try!` does for `Result`,
631 /// by letting the compiler infer the type of the resulting error.
632 /// Just as `map_err` above, this is useful for example to ensure
633 /// that streams have the same error type when used with
634 /// combinators.
635 ///
636 /// Note that this function consumes the receiving stream and returns a
637 /// wrapped version of it.
638 fn err_into<E>(self) -> ErrInto<Self, E>
639 where Self: Sized,
640 Self::Error: Into<E>,
641 {
642 err_into::new(self)
643 }
644
645 /// Creates a new stream of at most `amt` items of the underlying stream.
646 ///
647 /// Once `amt` items have been yielded from this stream then it will always
648 /// return that the stream is done.
649 ///
650 /// # Errors
651 ///
652 /// Any errors yielded from underlying stream, before the desired amount of
653 /// items is reached, are passed through and do not affect the total number
654 /// of items taken.
655 fn take(self, amt: u64) -> Take<Self>
656 where Self: Sized
657 {
658 take::new(self, amt)
659 }
660
661 /// Creates a new stream which skips `amt` items of the underlying stream.
662 ///
663 /// Once `amt` items have been skipped from this stream then it will always
664 /// return the remaining items on this stream.
665 ///
666 /// # Errors
667 ///
668 /// All errors yielded from underlying stream are passed through and do not
669 /// affect the total number of items skipped.
670 fn skip(self, amt: u64) -> Skip<Self>
671 where Self: Sized
672 {
673 skip::new(self, amt)
674 }
675
676 /// Fuse a stream such that `poll` will never again be called once it has
677 /// finished.
678 ///
679 /// Currently once a stream has returned `None` from `poll` any further
680 /// calls could exhibit bad behavior such as block forever, panic, never
681 /// return, etc. If it is known that `poll` may be called after stream has
682 /// already finished, then this method can be used to ensure that it has
683 /// defined semantics.
684 ///
685 /// Once a stream has been `fuse`d and it finishes, then it will forever
686 /// return `None` from `poll`. This, unlike for the traits `poll` method,
687 /// is guaranteed.
688 ///
689 /// Also note that as soon as this stream returns `None` it will be dropped
690 /// to reclaim resources associated with it.
691 fn fuse(self) -> Fuse<Self>
692 where Self: Sized
693 {
694 fuse::new(self)
695 }
696
697 /// Borrows a stream, rather than consuming it.
698 ///
699 /// This is useful to allow applying stream adaptors while still retaining
700 /// ownership of the original stream.
701 ///
702 /// ```
703 /// # extern crate futures;
704 /// # extern crate futures_executor;
705 /// use futures::prelude::*;
706 /// use futures::stream;
707 /// use futures::future;
708 /// use futures_executor::block_on;
709 ///
710 /// # fn main() {
711 /// let mut stream = stream::iter_ok::<_, ()>(1..5);
712 ///
713 /// let sum = block_on(stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b)));
714 /// assert_eq!(sum, Ok(3));
715 ///
716 /// // You can use the stream again
717 /// let sum = block_on(stream.take(2).fold(0, |a, b| future::ok(a + b)));
718 /// assert_eq!(sum, Ok(7));
719 /// # }
720 /// ```
721 fn by_ref(&mut self) -> &mut Self
722 where Self: Sized
723 {
724 self
725 }
726
727 /// Catches unwinding panics while polling the stream.
728 ///
729 /// Caught panic (if any) will be the last element of the resulting stream.
730 ///
731 /// In general, panics within a stream can propagate all the way out to the
732 /// task level. This combinator makes it possible to halt unwinding within
733 /// the stream itself. It's most commonly used within task executors. This
734 /// method should not be used for error handling.
735 ///
736 /// Note that this method requires the `UnwindSafe` bound from the standard
737 /// library. This isn't always applied automatically, and the standard
738 /// library provides an `AssertUnwindSafe` wrapper type to apply it
739 /// after-the fact. To assist using this method, the `Stream` trait is also
740 /// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`.
741 ///
742 /// This method is only available when the `std` feature of this
743 /// library is activated, and it is activated by default.
744 ///
745 /// # Examples
746 ///
747 /// ```rust
748 /// # extern crate futures;
749 /// # extern crate futures_executor;
750 ///
751 /// use futures::prelude::*;
752 /// use futures::stream;
753 /// use futures_executor::block_on;
754 ///
755 /// # fn main() {
756 /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]);
757 /// // panic on second element
758 /// let stream_panicking = stream.map(|o| o.unwrap());
759 /// // collect all the results
760 /// let stream = stream_panicking.catch_unwind().then(|r| Ok::<_, ()>(r));
761 ///
762 /// let results: Vec<_> = block_on(stream.collect()).unwrap();
763 /// match results[0] {
764 /// Ok(Ok(10)) => {}
765 /// _ => panic!("unexpected result!"),
766 /// }
767 /// assert!(results[1].is_err());
768 /// assert_eq!(results.len(), 2);
769 /// # }
770 /// ```
771 #[cfg(feature = "std")]
772 fn catch_unwind(self) -> CatchUnwind<Self>
773 where Self: Sized + std::panic::UnwindSafe
774 {
775 catch_unwind::new(self)
776 }
777
778 /// An adaptor for creating a buffered list of pending futures.
779 ///
780 /// If this stream's item can be converted into a future, then this adaptor
781 /// will buffer up to at most `amt` futures and then return results in the
782 /// same order as the underlying stream. No more than `amt` futures will be
783 /// buffered at any point in time, and less than `amt` may also be buffered
784 /// depending on the state of each future.
785 ///
786 /// The returned stream will be a stream of each future's result, with
787 /// errors passed through whenever they occur.
788 ///
789 /// This method is only available when the `std` feature of this
790 /// library is activated, and it is activated by default.
791 #[cfg(feature = "std")]
792 fn buffered(self, amt: usize) -> Buffered<Self>
793 where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
794 Self: Sized
795 {
796 buffered::new(self, amt)
797 }
798
799 /// An adaptor for creating a buffered list of pending futures (unordered).
800 ///
801 /// If this stream's item can be converted into a future, then this adaptor
802 /// will buffer up to `amt` futures and then return results in the order
803 /// in which they complete. No more than `amt` futures will be buffered at
804 /// any point in time, and less than `amt` may also be buffered depending on
805 /// the state of each future.
806 ///
807 /// The returned stream will be a stream of each future's result, with
808 /// errors passed through whenever they occur.
809 ///
810 /// This method is only available when the `std` feature of this
811 /// library is activated, and it is activated by default.
812 #[cfg(feature = "std")]
813 fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self>
814 where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
815 Self: Sized
816 {
817 buffer_unordered::new(self, amt)
818 }
819
820 /// An adapter for zipping two streams together.
821 ///
822 /// The zipped stream waits for both streams to produce an item, and then
823 /// returns that pair. If an error happens, then that error will be returned
824 /// immediately. If either stream ends then the zipped stream will also end.
825 fn zip<S>(self, other: S) -> Zip<Self, S>
826 where S: Stream<Error = Self::Error>,
827 Self: Sized,
828 {
829 zip::new(self, other)
830 }
831
832 /// Adapter for chaining two stream.
833 ///
834 /// The resulting stream emits elements from the first stream, and when
835 /// first stream reaches the end, emits the elements from the second stream.
836 ///
837 /// ```rust
838 /// # extern crate futures;
839 /// # extern crate futures_executor;
840 /// use futures::prelude::*;
841 /// use futures::stream;
842 /// use futures_executor::block_on;
843 ///
844 /// # fn main() {
845 /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]);
846 /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]);
847 ///
848 /// let stream = stream1.chain(stream2)
849 /// .then(|result| Ok::<_, ()>(result));
850 ///
851 /// let result: Vec<_> = block_on(stream.collect()).unwrap();
852 /// assert_eq!(result, vec![
853 /// Ok(10),
854 /// Err(false),
855 /// Err(true),
856 /// Ok(20),
857 /// ]);
858 /// # }
859 /// ```
860 fn chain<S>(self, other: S) -> Chain<Self, S>
861 where S: Stream<Item = Self::Item, Error = Self::Error>,
862 Self: Sized
863 {
864 chain::new(self, other)
865 }
866
867 /// Creates a new stream which exposes a `peek` method.
868 ///
869 /// Calling `peek` returns a reference to the next item in the stream.
870 fn peekable(self) -> Peekable<Self>
871 where Self: Sized
872 {
873 peek::new(self)
874 }
875
876 /// An adaptor for chunking up items of the stream inside a vector.
877 ///
878 /// This combinator will attempt to pull items from this stream and buffer
879 /// them into a local vector. At most `capacity` items will get buffered
880 /// before they're yielded from the returned stream.
881 ///
882 /// Note that the vectors returned from this iterator may not always have
883 /// `capacity` elements. If the underlying stream ended and only a partial
884 /// vector was created, it'll be returned. Additionally if an error happens
885 /// from the underlying stream then the currently buffered items will be
886 /// yielded.
887 ///
888 /// Errors are passed through the stream unbuffered.
889 ///
890 /// This method is only available when the `std` feature of this
891 /// library is activated, and it is activated by default.
892 ///
893 /// # Panics
894 ///
895 /// This method will panic of `capacity` is zero.
896 #[cfg(feature = "std")]
897 fn chunks(self, capacity: usize) -> Chunks<Self>
898 where Self: Sized
899 {
900 chunks::new(self, capacity)
901 }
902
903 /// Creates a stream that selects the next element from either this stream
904 /// or the provided one, whichever is ready first.
905 ///
906 /// This combinator will attempt to pull items from both streams. Each
907 /// stream will be polled in a round-robin fashion, and whenever a stream is
908 /// ready to yield an item that item is yielded.
909 ///
910 /// Error are passed through from either stream.
911 fn select<S>(self, other: S) -> Select<Self, S>
912 where S: Stream<Item = Self::Item, Error = Self::Error>,
913 Self: Sized,
914 {
915 select::new(self, other)
916 }
917
918 /// A future that completes after the given stream has been fully processed
919 /// into the sink, including flushing.
920 ///
921 /// This future will drive the stream to keep producing items until it is
922 /// exhausted, sending each item to the sink. It will complete once both the
923 /// stream is exhausted, and the sink has received and flushed all items.
924 /// Note that the sink is **not** closed.
925 ///
926 /// Doing `stream.forward(sink)` is roughly equivalent to
927 /// `sink.send_all(stream)`. The returned future will exhaust all items from
928 /// `self`, sending them all to `sink`.
929 ///
930 /// On completion, the pair `(stream, sink)` is returned.
931 fn forward<S>(self, sink: S) -> Forward<Self, S>
932 where S: Sink<SinkItem = Self::Item>,
933 Self::Error: From<S::SinkError>,
934 Self: Sized
935 {
936 forward::new(self, sink)
937 }
938
939 /// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
940 /// objects.
941 ///
942 /// This can be useful when you want to split ownership between tasks, or
943 /// allow direct interaction between the two objects (e.g. via
944 /// `Sink::send_all`).
945 ///
946 /// This method is only available when the `std` feature of this
947 /// library is activated, and it is activated by default.
948 #[cfg(feature = "std")]
949 fn split(self) -> (SplitSink<Self>, SplitStream<Self>)
950 where Self: Sink + Sized
951 {
952 split::split(self)
953 }
954
955 /// Do something with each item of this stream, afterwards passing it on.
956 ///
957 /// This is similar to the `Iterator::inspect` method in the standard
958 /// library where it allows easily inspecting each value as it passes
959 /// through the stream, for example to debug what's going on.
960 fn inspect<F>(self, f: F) -> Inspect<Self, F>
961 where F: FnMut(&Self::Item),
962 Self: Sized,
963 {
964 inspect::new(self, f)
965 }
966
967 /// Do something with the error of this stream, afterwards passing it on.
968 ///
969 /// This is similar to the `Stream::inspect` method where it allows
970 /// easily inspecting the error as it passes through the stream, for
971 /// example to debug what's going on.
972 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
973 where F: FnMut(&Self::Error),
974 Self: Sized,
975 {
976 inspect_err::new(self, f)
977 }
978
979 /// Handle errors generated by this stream by converting them into
980 /// `Option<Self::Item>`, such that a `None` value terminates the stream.
981 ///
982 /// Because it can never produce an error, the returned `Recover` stream can
983 /// conform to any specific `Error` type, including `Never`.
984 fn recover<E, F>(self, f: F) -> Recover<Self, E, F>
985 where F: FnMut(Self::Error) -> Option<Self::Item>,
986 Self: Sized,
987 {
988 recover::new(self, f)
989 }
990
991 /// Wrap this stream in an `Either` stream, making it the left-hand variant
992 /// of that `Either`.
993 ///
994 /// This can be used in combination with the `right` method to write `if`
995 /// statements that evaluate to different streams in different branches.
996 #[deprecated(note = "use `left_stream` instead")]
997 fn left<B>(self) -> Either<Self, B>
998 where B: Stream<Item = Self::Item, Error = Self::Error>,
999 Self: Sized
1000 {
1001 Either::Left(self)
1002 }
1003
1004 /// Wrap this stream in an `Either` stream, making it the right-hand variant
1005 /// of that `Either`.
1006 ///
1007 /// This can be used in combination with the `left` method to write `if`
1008 /// statements that evaluate to different streams in different branches.
1009 #[deprecated(note = "use `right_stream` instead")]
1010 fn right<B>(self) -> Either<B, Self>
1011 where B: Stream<Item = Self::Item, Error = Self::Error>,
1012 Self: Sized
1013 {
1014 Either::Right(self)
1015 }
1016
1017 /// Wrap this stream in an `Either` stream, making it the left-hand variant
1018 /// of that `Either`.
1019 ///
1020 /// This can be used in combination with the `right_stream` method to write `if`
1021 /// statements that evaluate to different streams in different branches.
1022 fn left_stream<B>(self) -> Either<Self, B>
1023 where B: Stream<Item = Self::Item, Error = Self::Error>,
1024 Self: Sized
1025 {
1026 Either::Left(self)
1027 }
1028
1029 /// Wrap this stream in an `Either` stream, making it the right-hand variant
1030 /// of that `Either`.
1031 ///
1032 /// This can be used in combination with the `left_stream` method to write `if`
1033 /// statements that evaluate to different streams in different branches.
1034 fn right_stream<B>(self) -> Either<B, Self>
1035 where B: Stream<Item = Self::Item, Error = Self::Error>,
1036 Self: Sized
1037 {
1038 Either::Right(self)
1039 }
1040}