broker_tokio/stream/mod.rs
1//! Stream utilities for Tokio.
2//!
3//! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait.
4//!
5//! This module provides helpers to work with them.
6
7mod all;
8use all::AllFuture;
9
10mod any;
11use any::AnyFuture;
12
13mod chain;
14use chain::Chain;
15
16mod collect;
17use collect::Collect;
18pub use collect::FromStream;
19
20mod empty;
21pub use empty::{empty, Empty};
22
23mod filter;
24use filter::Filter;
25
26mod filter_map;
27use filter_map::FilterMap;
28
29mod fuse;
30use fuse::Fuse;
31
32mod iter;
33pub use iter::{iter, Iter};
34
35mod map;
36use map::Map;
37
38mod merge;
39use merge::Merge;
40
41mod next;
42use next::Next;
43
44mod once;
45pub use once::{once, Once};
46
47mod pending;
48pub use pending::{pending, Pending};
49
50mod try_next;
51use try_next::TryNext;
52
53mod take;
54use take::Take;
55
56mod take_while;
57use take_while::TakeWhile;
58
59pub use futures_core::Stream;
60
61/// An extension trait for `Stream`s that provides a variety of convenient
62/// combinator functions.
63pub trait StreamExt: Stream {
64 /// Consumes and returns the next value in the stream or `None` if the
65 /// stream is finished.
66 ///
67 /// Equivalent to:
68 ///
69 /// ```ignore
70 /// async fn next(&mut self) -> Option<Self::Item>;
71 /// ```
72 ///
73 /// Note that because `next` doesn't take ownership over the stream,
74 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
75 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
76 /// be done by boxing the stream using [`Box::pin`] or
77 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
78 /// crate.
79 ///
80 /// # Examples
81 ///
82 /// ```
83 /// # #[tokio::main]
84 /// # async fn main() {
85 /// use tokio::stream::{self, StreamExt};
86 ///
87 /// let mut stream = stream::iter(1..=3);
88 ///
89 /// assert_eq!(stream.next().await, Some(1));
90 /// assert_eq!(stream.next().await, Some(2));
91 /// assert_eq!(stream.next().await, Some(3));
92 /// assert_eq!(stream.next().await, None);
93 /// # }
94 /// ```
95 fn next(&mut self) -> Next<'_, Self>
96 where
97 Self: Unpin,
98 {
99 Next::new(self)
100 }
101
102 /// Consumes and returns the next item in the stream. If an error is
103 /// encountered before the next item, the error is returned instead.
104 ///
105 /// Equivalent to:
106 ///
107 /// ```ignore
108 /// async fn try_next(&mut self) -> Result<Option<T>, E>;
109 /// ```
110 ///
111 /// This is similar to the [`next`](StreamExt::next) combinator,
112 /// but returns a [`Result<Option<T>, E>`](Result) rather than
113 /// an [`Option<Result<T, E>>`](Option), making for easy use
114 /// with the [`?`](std::ops::Try) operator.
115 ///
116 /// # Examples
117 ///
118 /// ```
119 /// # #[tokio::main]
120 /// # async fn main() {
121 /// use tokio::stream::{self, StreamExt};
122 ///
123 /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
124 ///
125 /// assert_eq!(stream.try_next().await, Ok(Some(1)));
126 /// assert_eq!(stream.try_next().await, Ok(Some(2)));
127 /// assert_eq!(stream.try_next().await, Err("nope"));
128 /// # }
129 /// ```
130 fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
131 where
132 Self: Stream<Item = Result<T, E>> + Unpin,
133 {
134 TryNext::new(self)
135 }
136
137 /// Maps this stream's items to a different type, returning a new stream of
138 /// the resulting type.
139 ///
140 /// The provided closure is executed over all elements of this stream as
141 /// they are made available. It is executed inline with calls to
142 /// [`poll_next`](Stream::poll_next).
143 ///
144 /// Note that this function consumes the stream passed into it and returns a
145 /// wrapped version of it, similar to the existing `map` methods in the
146 /// standard library.
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// # #[tokio::main]
152 /// # async fn main() {
153 /// use tokio::stream::{self, StreamExt};
154 ///
155 /// let stream = stream::iter(1..=3);
156 /// let mut stream = stream.map(|x| x + 3);
157 ///
158 /// assert_eq!(stream.next().await, Some(4));
159 /// assert_eq!(stream.next().await, Some(5));
160 /// assert_eq!(stream.next().await, Some(6));
161 /// # }
162 /// ```
163 fn map<T, F>(self, f: F) -> Map<Self, F>
164 where
165 F: FnMut(Self::Item) -> T,
166 Self: Sized,
167 {
168 Map::new(self, f)
169 }
170
171 /// Combine two streams into one by interleaving the output of both as it
172 /// is produced.
173 ///
174 /// Values are produced from the merged stream in the order they arrive from
175 /// the two source streams. If both source streams provide values
176 /// simultaneously, the merge stream alternates between them. This provides
177 /// some level of fairness.
178 ///
179 /// The merged stream completes once **both** source streams complete. When
180 /// one source stream completes before the other, the merge stream
181 /// exclusively polls the remaining stream.
182 ///
183 /// # Examples
184 ///
185 /// ```
186 /// use tokio::stream::StreamExt;
187 /// use tokio::sync::mpsc;
188 /// use tokio::time;
189 ///
190 /// use std::time::Duration;
191 ///
192 /// # /*
193 /// #[tokio::main]
194 /// # */
195 /// # #[tokio::main(basic_scheduler)]
196 /// async fn main() {
197 /// # time::pause();
198 /// let (mut tx1, rx1) = mpsc::channel(10);
199 /// let (mut tx2, rx2) = mpsc::channel(10);
200 ///
201 /// let mut rx = rx1.merge(rx2);
202 ///
203 /// tokio::spawn(async move {
204 /// // Send some values immediately
205 /// tx1.send(1).await.unwrap();
206 /// tx1.send(2).await.unwrap();
207 ///
208 /// // Let the other task send values
209 /// time::delay_for(Duration::from_millis(20)).await;
210 ///
211 /// tx1.send(4).await.unwrap();
212 /// });
213 ///
214 /// tokio::spawn(async move {
215 /// // Wait for the first task to send values
216 /// time::delay_for(Duration::from_millis(5)).await;
217 ///
218 /// tx2.send(3).await.unwrap();
219 ///
220 /// time::delay_for(Duration::from_millis(25)).await;
221 ///
222 /// // Send the final value
223 /// tx2.send(5).await.unwrap();
224 /// });
225 ///
226 /// assert_eq!(1, rx.next().await.unwrap());
227 /// assert_eq!(2, rx.next().await.unwrap());
228 /// assert_eq!(3, rx.next().await.unwrap());
229 /// assert_eq!(4, rx.next().await.unwrap());
230 /// assert_eq!(5, rx.next().await.unwrap());
231 ///
232 /// // The merged stream is consumed
233 /// assert!(rx.next().await.is_none());
234 /// }
235 /// ```
236 fn merge<U>(self, other: U) -> Merge<Self, U>
237 where
238 U: Stream<Item = Self::Item>,
239 Self: Sized,
240 {
241 Merge::new(self, other)
242 }
243
244 /// Filters the values produced by this stream according to the provided
245 /// predicate.
246 ///
247 /// As values of this stream are made available, the provided predicate `f`
248 /// will be run against them. If the predicate
249 /// resolves to `true`, then the stream will yield the value, but if the
250 /// predicate resolves to `false`, then the value
251 /// will be discarded and the next value will be produced.
252 ///
253 /// Note that this function consumes the stream passed into it and returns a
254 /// wrapped version of it, similar to [`Iterator::filter`] method in the
255 /// standard library.
256 ///
257 /// # Examples
258 ///
259 /// ```
260 /// # #[tokio::main]
261 /// # async fn main() {
262 /// use tokio::stream::{self, StreamExt};
263 ///
264 /// let stream = stream::iter(1..=8);
265 /// let mut evens = stream.filter(|x| x % 2 == 0);
266 ///
267 /// assert_eq!(Some(2), evens.next().await);
268 /// assert_eq!(Some(4), evens.next().await);
269 /// assert_eq!(Some(6), evens.next().await);
270 /// assert_eq!(Some(8), evens.next().await);
271 /// assert_eq!(None, evens.next().await);
272 /// # }
273 /// ```
274 fn filter<F>(self, f: F) -> Filter<Self, F>
275 where
276 F: FnMut(&Self::Item) -> bool,
277 Self: Sized,
278 {
279 Filter::new(self, f)
280 }
281
282 /// Filters the values produced by this stream while simultaneously mapping
283 /// them to a different type according to the provided closure.
284 ///
285 /// As values of this stream are made available, the provided function will
286 /// be run on them. If the predicate `f` resolves to
287 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
288 /// it resolves to [`None`] then the next value will be produced.
289 ///
290 /// Note that this function consumes the stream passed into it and returns a
291 /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
292 /// standard library.
293 ///
294 /// # Examples
295 /// ```
296 /// # #[tokio::main]
297 /// # async fn main() {
298 /// use tokio::stream::{self, StreamExt};
299 ///
300 /// let stream = stream::iter(1..=8);
301 /// let mut evens = stream.filter_map(|x| {
302 /// if x % 2 == 0 { Some(x + 1) } else { None }
303 /// });
304 ///
305 /// assert_eq!(Some(3), evens.next().await);
306 /// assert_eq!(Some(5), evens.next().await);
307 /// assert_eq!(Some(7), evens.next().await);
308 /// assert_eq!(Some(9), evens.next().await);
309 /// assert_eq!(None, evens.next().await);
310 /// # }
311 /// ```
312 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
313 where
314 F: FnMut(Self::Item) -> Option<T>,
315 Self: Sized,
316 {
317 FilterMap::new(self, f)
318 }
319
320 /// Creates a stream which ends after the first `None`.
321 ///
322 /// After a stream returns `None`, behavior is undefined. Future calls to
323 /// `poll_next` may or may not return `Some(T)` again or they may panic.
324 /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
325 /// return `None` forever.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use tokio::stream::{Stream, StreamExt};
331 ///
332 /// use std::pin::Pin;
333 /// use std::task::{Context, Poll};
334 ///
335 /// // a stream which alternates between Some and None
336 /// struct Alternate {
337 /// state: i32,
338 /// }
339 ///
340 /// impl Stream for Alternate {
341 /// type Item = i32;
342 ///
343 /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
344 /// let val = self.state;
345 /// self.state = self.state + 1;
346 ///
347 /// // if it's even, Some(i32), else None
348 /// if val % 2 == 0 {
349 /// Poll::Ready(Some(val))
350 /// } else {
351 /// Poll::Ready(None)
352 /// }
353 /// }
354 /// }
355 ///
356 /// #[tokio::main]
357 /// async fn main() {
358 /// let mut stream = Alternate { state: 0 };
359 ///
360 /// // the stream goes back and forth
361 /// assert_eq!(stream.next().await, Some(0));
362 /// assert_eq!(stream.next().await, None);
363 /// assert_eq!(stream.next().await, Some(2));
364 /// assert_eq!(stream.next().await, None);
365 ///
366 /// // however, once it is fused
367 /// let mut stream = stream.fuse();
368 ///
369 /// assert_eq!(stream.next().await, Some(4));
370 /// assert_eq!(stream.next().await, None);
371 ///
372 /// // it will always return `None` after the first time.
373 /// assert_eq!(stream.next().await, None);
374 /// assert_eq!(stream.next().await, None);
375 /// assert_eq!(stream.next().await, None);
376 /// }
377 /// ```
378 fn fuse(self) -> Fuse<Self>
379 where
380 Self: Sized,
381 {
382 Fuse::new(self)
383 }
384
385 /// Creates a new stream of at most `n` items of the underlying stream.
386 ///
387 /// Once `n` items have been yielded from this stream then it will always
388 /// return that the stream is done.
389 ///
390 /// # Examples
391 ///
392 /// ```
393 /// # #[tokio::main]
394 /// # async fn main() {
395 /// use tokio::stream::{self, StreamExt};
396 ///
397 /// let mut stream = stream::iter(1..=10).take(3);
398 ///
399 /// assert_eq!(Some(1), stream.next().await);
400 /// assert_eq!(Some(2), stream.next().await);
401 /// assert_eq!(Some(3), stream.next().await);
402 /// assert_eq!(None, stream.next().await);
403 /// # }
404 /// ```
405 fn take(self, n: usize) -> Take<Self>
406 where
407 Self: Sized,
408 {
409 Take::new(self, n)
410 }
411
412 /// Take elements from this stream while the provided predicate
413 /// resolves to `true`.
414 ///
415 /// This function, like `Iterator::take_while`, will take elements from the
416 /// stream until the predicate `f` resolves to `false`. Once one element
417 /// returns false it will always return that the stream is done.
418 ///
419 /// # Examples
420 ///
421 /// ```
422 /// # #[tokio::main]
423 /// # async fn main() {
424 /// use tokio::stream::{self, StreamExt};
425 ///
426 /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
427 ///
428 /// assert_eq!(Some(1), stream.next().await);
429 /// assert_eq!(Some(2), stream.next().await);
430 /// assert_eq!(Some(3), stream.next().await);
431 /// assert_eq!(None, stream.next().await);
432 /// # }
433 /// ```
434 fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
435 where
436 F: FnMut(&Self::Item) -> bool,
437 Self: Sized,
438 {
439 TakeWhile::new(self, f)
440 }
441
442 /// Tests if every element of the stream matches a predicate.
443 ///
444 /// `all()` takes a closure that returns `true` or `false`. It applies
445 /// this closure to each element of the stream, and if they all return
446 /// `true`, then so does `all`. If any of them return `false`, it
447 /// returns `false`. An empty stream returns `true`.
448 ///
449 /// `all()` is short-circuiting; in other words, it will stop processing
450 /// as soon as it finds a `false`, given that no matter what else happens,
451 /// the result will also be `false`.
452 ///
453 /// An empty stream returns `true`.
454 ///
455 /// # Examples
456 ///
457 /// Basic usage:
458 ///
459 /// ```
460 /// # #[tokio::main]
461 /// # async fn main() {
462 /// use tokio::stream::{self, StreamExt};
463 ///
464 /// let a = [1, 2, 3];
465 ///
466 /// assert!(stream::iter(&a).all(|&x| x > 0).await);
467 ///
468 /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
469 /// # }
470 /// ```
471 ///
472 /// Stopping at the first `false`:
473 ///
474 /// ```
475 /// # #[tokio::main]
476 /// # async fn main() {
477 /// use tokio::stream::{self, StreamExt};
478 ///
479 /// let a = [1, 2, 3];
480 ///
481 /// let mut iter = stream::iter(&a);
482 ///
483 /// assert!(!iter.all(|&x| x != 2).await);
484 ///
485 /// // we can still use `iter`, as there are more elements.
486 /// assert_eq!(iter.next().await, Some(&3));
487 /// # }
488 /// ```
489 fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
490 where
491 Self: Unpin,
492 F: FnMut(Self::Item) -> bool,
493 {
494 AllFuture::new(self, f)
495 }
496
497 /// Tests if any element of the stream matches a predicate.
498 ///
499 /// `any()` takes a closure that returns `true` or `false`. It applies
500 /// this closure to each element of the stream, and if any of them return
501 /// `true`, then so does `any()`. If they all return `false`, it
502 /// returns `false`.
503 ///
504 /// `any()` is short-circuiting; in other words, it will stop processing
505 /// as soon as it finds a `true`, given that no matter what else happens,
506 /// the result will also be `true`.
507 ///
508 /// An empty stream returns `false`.
509 ///
510 /// Basic usage:
511 ///
512 /// ```
513 /// # #[tokio::main]
514 /// # async fn main() {
515 /// use tokio::stream::{self, StreamExt};
516 ///
517 /// let a = [1, 2, 3];
518 ///
519 /// assert!(stream::iter(&a).any(|&x| x > 0).await);
520 ///
521 /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
522 /// # }
523 /// ```
524 ///
525 /// Stopping at the first `true`:
526 ///
527 /// ```
528 /// # #[tokio::main]
529 /// # async fn main() {
530 /// use tokio::stream::{self, StreamExt};
531 ///
532 /// let a = [1, 2, 3];
533 ///
534 /// let mut iter = stream::iter(&a);
535 ///
536 /// assert!(iter.any(|&x| x != 2).await);
537 ///
538 /// // we can still use `iter`, as there are more elements.
539 /// assert_eq!(iter.next().await, Some(&2));
540 /// # }
541 /// ```
542 fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
543 where
544 Self: Unpin,
545 F: FnMut(Self::Item) -> bool,
546 {
547 AnyFuture::new(self, f)
548 }
549
550 /// Combine two streams into one by first returning all values from the
551 /// first stream then all values from the second stream.
552 ///
553 /// As long as `self` still has values to emit, no values from `other` are
554 /// emitted, even if some are ready.
555 ///
556 /// # Examples
557 ///
558 /// ```
559 /// use tokio::stream::{self, StreamExt};
560 ///
561 /// #[tokio::main]
562 /// async fn main() {
563 /// let one = stream::iter(vec![1, 2, 3]);
564 /// let two = stream::iter(vec![4, 5, 6]);
565 ///
566 /// let mut stream = one.chain(two);
567 ///
568 /// assert_eq!(stream.next().await, Some(1));
569 /// assert_eq!(stream.next().await, Some(2));
570 /// assert_eq!(stream.next().await, Some(3));
571 /// assert_eq!(stream.next().await, Some(4));
572 /// assert_eq!(stream.next().await, Some(5));
573 /// assert_eq!(stream.next().await, Some(6));
574 /// assert_eq!(stream.next().await, None);
575 /// }
576 /// ```
577 fn chain<U>(self, other: U) -> Chain<Self, U>
578 where
579 U: Stream<Item = Self::Item>,
580 Self: Sized,
581 {
582 Chain::new(self, other)
583 }
584
585 /// Drain stream pushing all emitted values into a collection.
586 ///
587 /// `collect` streams all values, awaiting as needed. Values are pushed into
588 /// a collection. A number of different target collection types are
589 /// supported, including [`Vec`](std::vec::Vec),
590 /// [`String`](std::string::String), and [`Bytes`](bytes::Bytes).
591 ///
592 /// # `Result`
593 ///
594 /// `collect()` can also be used with streams of type `Result<T, E>` where
595 /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
596 /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
597 /// streaming is terminated and `collect()` returns the `Err`.
598 ///
599 /// # Notes
600 ///
601 /// `FromStream` is currently a sealed trait. Stabilization is pending
602 /// enhancements to the Rust langague.
603 ///
604 /// # Examples
605 ///
606 /// Basic usage:
607 ///
608 /// ```
609 /// use tokio::stream::{self, StreamExt};
610 ///
611 /// #[tokio::main]
612 /// async fn main() {
613 /// let doubled: Vec<i32> =
614 /// stream::iter(vec![1, 2, 3])
615 /// .map(|x| x * 2)
616 /// .collect()
617 /// .await;
618 ///
619 /// assert_eq!(vec![2, 4, 6], doubled);
620 /// }
621 /// ```
622 ///
623 /// Collecting a stream of `Result` values
624 ///
625 /// ```
626 /// use tokio::stream::{self, StreamExt};
627 ///
628 /// #[tokio::main]
629 /// async fn main() {
630 /// // A stream containing only `Ok` values will be collected
631 /// let values: Result<Vec<i32>, &str> =
632 /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
633 /// .collect()
634 /// .await;
635 ///
636 /// assert_eq!(Ok(vec![1, 2, 3]), values);
637 ///
638 /// // A stream containing `Err` values will return the first error.
639 /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
640 ///
641 /// let values: Result<Vec<i32>, &str> =
642 /// stream::iter(results)
643 /// .collect()
644 /// .await;
645 ///
646 /// assert_eq!(Err("no"), values);
647 /// }
648 /// ```
649 fn collect<T>(self) -> Collect<Self, T>
650 where
651 T: FromStream<Self::Item>,
652 Self: Sized,
653 {
654 Collect::new(self)
655 }
656}
657
658impl<St: ?Sized> StreamExt for St where St: Stream {}