async_std/stream/
mod.rs

1//! Composable asynchronous iteration.
2//!
3//! This module is an async version of [`std::iter`].
4//!
5//! If you've found yourself with an asynchronous collection of some kind,
6//! and needed to perform an operation on the elements of said collection,
7//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
8//! asynchronous Rust code, so it's worth becoming familiar with them.
9//!
10//! Before explaining more, let's talk about how this module is structured:
11//!
12//! # Organization
13//!
14//! This module is largely organized by type:
15//!
16//! * [Traits] are the core portion: these traits define what kind of streams
17//!   exist and what you can do with them. The methods of these traits are worth
18//!   putting some extra study time into.
19//! * [Functions] provide some helpful ways to create some basic streams.
20//! * [Structs] are often the return types of the various methods on this
21//!   module's traits. You'll usually want to look at the method that creates
22//!   the `struct`, rather than the `struct` itself. For more detail about why,
23//!   see '[Implementing Stream](#implementing-stream)'.
24//!
25//! [Traits]: #traits
26//! [Functions]: #functions
27//! [Structs]: #structs
28//!
29//! That's it! Let's dig into streams.
30//!
31//! # Stream
32//!
33//! The heart and soul of this module is the [`Stream`] trait. The core of
34//! [`Stream`] looks like this:
35//!
36//! ```
37//! #![allow(dead_code)]
38//! # use async_std::task::{Context, Poll};
39//! # use std::pin::Pin;
40//! pub trait Stream {
41//!     type Item;
42//!     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
43//! }
44//! # impl Stream for () {
45//! #   type Item = ();
46//! #   fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Pending }
47//! # }
48//! ```
49//!
50//! A stream has a method, [`next`], which when called, returns an
51//! [`Poll`]<[`Option`]`<Item>>`. [`next`] will return `Ready(Some(Item))`
52//! as long as there are elements, and once they've all been exhausted, will
53//! return `None` to indicate that iteration is finished. If we're waiting on
54//! something asynchronous to resolve `Pending` is returned.
55//!
56//! Individual streams may choose to resume iteration, and so calling
57//! [`next`] again may or may not eventually start returning `Ready(Some(Item))`
58//! again at some point.
59//!
60//! [`Stream`]'s full definition includes a number of other methods as well,
61//! but they are default methods, built on top of [`next`], and so you get
62//! them for free.
63//!
64//! Streams are also composable, and it's common to chain them together to do
65//! more complex forms of processing. See the [Adapters](#adapters) section
66//! below for more details.
67//!
68//! [`Poll`]: ../task/enum.Poll.html
69//! [`Stream`]: trait.Stream.html
70//! [`next`]: trait.Stream.html#tymethod.next
71//! [`Option`]: ../../std/option/enum.Option.html
72//!
73//! # The three forms of streaming
74//!
75//! There are three common methods which can create streams from a collection:
76//!
77//! * `stream()`, which iterates over `&T`.
78//! * `stream_mut()`, which iterates over `&mut T`.
79//! * `into_stream()`, which iterates over `T`.
80//!
81//! Various things in async-std may implement one or more of the
82//! three, where appropriate.
83//!
84//! # Implementing Stream
85//!
86//! Creating a stream of your own involves two steps: creating a `struct` to
87//! hold the stream's state, and then `impl`ementing [`Stream`] for that
88//! `struct`. This is why there are so many `struct`s in this module: there is
89//! one for each stream and iterator adapter.
90//!
91//! Let's make a stream named `Counter` which counts from `1` to `5`:
92//!
93//! ```
94//! # use async_std::prelude::*;
95//! # use async_std::task::{Context, Poll};
96//! # use std::pin::Pin;
97//! // First, the struct:
98//!
99//! /// A stream which counts from one to five
100//! struct Counter {
101//!     count: usize,
102//! }
103//!
104//! // we want our count to start at one, so let's add a new() method to help.
105//! // This isn't strictly necessary, but is convenient. Note that we start
106//! // `count` at zero, we'll see why in `next()`'s implementation below.
107//! impl Counter {
108//!     fn new() -> Counter {
109//!         Counter { count: 0 }
110//!     }
111//! }
112//!
113//! // Then, we implement `Stream` for our `Counter`:
114//!
115//! impl Stream for Counter {
116//!     // we will be counting with usize
117//!     type Item = usize;
118//!
119//!     // poll_next() is the only required method
120//!     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121//!         // Increment our count. This is why we started at zero.
122//!         self.count += 1;
123//!
124//!         // Check to see if we've finished counting or not.
125//!         if self.count < 6 {
126//!             Poll::Ready(Some(self.count))
127//!         } else {
128//!             Poll::Ready(None)
129//!         }
130//!     }
131//! }
132//!
133//! // And now we can use it!
134//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
135//! #
136//! let mut counter = Counter::new();
137//!
138//! let x = counter.next().await.unwrap();
139//! println!("{}", x);
140//!
141//! let x = counter.next().await.unwrap();
142//! println!("{}", x);
143//!
144//! let x = counter.next().await.unwrap();
145//! println!("{}", x);
146//!
147//! let x = counter.next().await.unwrap();
148//! println!("{}", x);
149//!
150//! let x = counter.next().await.unwrap();
151//! println!("{}", x);
152//! #
153//! # Ok(()) }) }
154//! ```
155//!
156//! This will print `1` through `5`, each on their own line.
157//!
158//! Calling `next().await` this way gets repetitive. Rust has a construct which
159//! can call `next()` on your stream, until it reaches `None`. Let's go over
160//! that next.
161//!
162//! # while let Loops and IntoStream
163//!
164//! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic
165//! example of `while let`:
166//!
167//! ```
168//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
169//! #
170//! # use async_std::prelude::*;
171//! # use async_std::stream;
172//! let mut values = stream::from_iter(1u8..6);
173//!
174//! while let Some(x) = values.next().await {
175//!     println!("{}", x);
176//! }
177//! #
178//! # Ok(()) }) }
179//! ```
180//!
181//! This will print the numbers one through five, each on their own line. But
182//! you'll notice something here: we never called anything on our vector to
183//! produce a stream. What gives?
184//!
185//! There's a trait in the standard library for converting something into an
186//! stream: [`IntoStream`]. This trait has one method, [`into_stream`],
187//! which converts the thing implementing [`IntoStream`] into a stream.
188//!
189//! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler
190//! support yet. This means that automatic conversions like with `for` loops
191//! doesn't occur yet, and `into_stream` or `from_iter` as above will always
192//! have to be called manually.
193//!
194//! [`IntoStream`]: trait.IntoStream.html
195//! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
196//!
197//! # Adapters
198//!
199//! Functions which take an [`Stream`] and return another [`Stream`] are
200//! often called 'stream adapters', as they are a form of the 'adapter
201//! pattern'.
202//!
203//! Common stream adapters include [`map`], [`take`], and [`filter`].
204//! For more, see their documentation.
205//!
206//! [`map`]: trait.Stream.html#method.map
207//! [`take`]: trait.Stream.html#method.take
208//! [`filter`]: trait.Stream.html#method.filter
209//!
210//! # Laziness
211//!
212//! Streams (and stream [adapters](#adapters)) are *lazy*. This means that
213//! just creating a stream doesn't _do_ a whole lot. Nothing really happens
214//! until you call [`next`]. This is sometimes a source of confusion when
215//! creating a stream solely for its side effects. For example, the [`map`]
216//! method calls a closure on each element it iterates over:
217//!
218//! ```
219//! # #![allow(unused_must_use)]
220//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
221//! #
222//! # use async_std::prelude::*;
223//! # use async_std::stream;
224//! let v = stream::repeat(1u8).take(5);
225//! v.map(|x| println!("{}", x));
226//! #
227//! # Ok(()) }) }
228//! ```
229//!
230//! This will not print any values, as we only created a stream, rather than
231//! using it. The compiler will warn us about this kind of behavior:
232//!
233//! ```text
234//! warning: unused result that must be used: streams are lazy and
235//! do nothing unless consumed
236//! ```
237//!
238//! The idiomatic way to write a [`map`] for its side effects is to use a
239//! `while let` loop instead:
240//!
241//! ```
242//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
243//! #
244//! # use async_std::prelude::*;
245//! # use async_std::stream;
246//! let mut v = stream::repeat(1u8).take(5);
247//!
248//! while let Some(x) = &v.next().await {
249//!     println!("{}", x);
250//! }
251//! #
252//! # Ok(()) }) }
253//! ```
254//!
255//! [`map`]: trait.Stream.html#method.map
256//!
257//! The two most common ways to evaluate a stream are to use a `while let` loop
258//! like this, or using the [`collect`] method to produce a new collection.
259//!
260//! [`collect`]: trait.Stream.html#method.collect
261//!
262//! # Infinity
263//!
264//! Streams do not have to be finite. As an example, a repeat stream is
265//! an infinite stream:
266//!
267//! ```
268//! # use async_std::stream;
269//! let numbers = stream::repeat(1u8);
270//! ```
271//!
272//! It is common to use the [`take`] stream adapter to turn an infinite
273//! stream into a finite one:
274//!
275//! ```
276//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
277//! #
278//! # use async_std::prelude::*;
279//! # use async_std::stream;
280//! let numbers = stream::from_iter(0u8..);
281//! let mut five_numbers = numbers.take(5);
282//!
283//! while let Some(number) = five_numbers.next().await {
284//!     println!("{}", number);
285//! }
286//! #
287//! # Ok(()) }) }
288//! ```
289//!
290//! This will print the numbers `0` through `4`, each on their own line.
291//!
292//! Bear in mind that methods on infinite streams, even those for which a
293//! result can be determined mathematically in finite time, may not terminate.
294//! Specifically, methods such as [`min`], which in the general case require
295//! traversing every element in the stream, are likely not to return
296//! successfully for any infinite streams.
297//!
298//! ```ignore
299//! let ones = async_std::stream::repeat(1);
300//! let least = ones.min().await.unwrap(); // Oh no! An infinite loop!
301//! // `ones.min()` causes an infinite loop, so we won't reach this point!
302//! println!("The smallest number one is {}.", least);
303//! ```
304//!
305//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
306//! [`take`]: trait.Stream.html#method.take
307//! [`min`]: trait.Stream.html#method.min
308
309pub use empty::{empty, Empty};
310pub use from_fn::{from_fn, FromFn};
311pub use from_iter::{from_iter, FromIter};
312pub use once::{once, Once};
313pub use repeat::{repeat, Repeat};
314pub use repeat_with::{repeat_with, RepeatWith};
315pub use stream::*;
316
317pub(crate) mod stream;
318
319mod empty;
320mod from_fn;
321mod from_iter;
322mod once;
323mod repeat;
324mod repeat_with;
325
326cfg_unstable! {
327    mod double_ended_stream;
328    mod exact_size_stream;
329    mod extend;
330    mod from_stream;
331    mod fused_stream;
332    mod interval;
333    mod into_stream;
334    mod pending;
335    mod product;
336    mod successors;
337    mod sum;
338
339    pub use double_ended_stream::DoubleEndedStream;
340    pub use exact_size_stream::ExactSizeStream;
341    pub use extend::{extend, Extend};
342    pub use from_stream::FromStream;
343    pub use fused_stream::FusedStream;
344    pub use interval::{interval, Interval};
345    pub use into_stream::IntoStream;
346    pub use pending::{pending, Pending};
347    pub use product::Product;
348    pub use stream::Merge;
349    pub use successors::{successors, Successors};
350    pub use sum::Sum;
351}