futures_time/
lib.rs

1//! # Async time operators.
2//!
3//! This crate provides ergonomic, async time-based operations. It serves as an
4//! experimental playground to experiment with how we could potentially add
5//! time-based operations to `async-std`, and subsequently the stdlib.
6//!
7//! The goal is to make working with time and other events feel natural. A major
8//! source of inspiration for this has been RxJS, which uses events (including
9//! time) to trigger operations. This crate takes that principle, inverts the
10//! model to make it evaluate lazily, and wraps it in an ergnomic Rust
11//! interface.
12//!
13//! # Examples
14//!
15//! __Delay a future's execution by 100ms__
16//!
17//! ```
18//! use futures_time::prelude::*;
19//! use futures_time::time::Duration;
20//!
21//! fn main() {
22//!     async_io::block_on(async {
23//!         let res = async { "meow" }
24//!             .delay(Duration::from_millis(100))
25//!             .await;
26//!         assert_eq!(res, "meow");
27//!     })
28//! }
29//! ```
30//!
31//! __Error if a future takes longer than 200ms__
32//!
33//! ```
34//! use futures_time::prelude::*;
35//! use futures_time::time::Duration;
36//!
37//! fn main() {
38//!     async_io::block_on(async {
39//!         let res = async { "meow" }
40//!             .delay(Duration::from_millis(100))
41//!             .timeout(Duration::from_millis(200))
42//!             .await;
43//!         assert_eq!(res.unwrap(), "meow");
44//!     })
45//! }
46//! ```
47//!
48//! __Throttle a stream__
49//!
50//! This lets two items through in total: one `100ms` after the program has
51//! started, and one `300ms` after the program has started.
52//!
53//! ```
54//! use futures_lite::prelude::*;
55//! use futures_time::prelude::*;
56//! use futures_time::time::Duration;
57//! use futures_time::stream;
58//!
59//! fn main() {
60//!     async_io::block_on(async {
61//!         let mut counter = 0;
62//!         stream::interval(Duration::from_millis(100))  // Yield an item every 100ms
63//!             .take(4)                                  // Stop after 4 items
64//!             .throttle(Duration::from_millis(300))     // Only let an item through every 300ms
65//!             .for_each(|_| counter += 1)               // Increment a counter for each item
66//!             .await;
67//!
68//!         assert_eq!(counter, 2);
69//!     })
70//! }
71//! ```
72//!
73//! # The `Timer` trait
74//!
75//! The future returned by [`task::sleep`] implements the [`future::Timer`]
76//! trait. This represents a future whose deadline can be moved forward into the
77//! future.
78//!
79//! For example, say we have a deadline of `Duration::from_secs(10)`. By calling
80//! `Timer::reset_timer` the timer can be reschedule to trigger at a later time.
81//! This functionality is required for methods such as `debounce` and
82//! `Stream::timeout`, which will regularly want to reschedule their timers to trigger
83//! the future.
84//!
85//! Currently the only type implementing the `Timer` trait is
86//! [`task::Sleep`], which is created from a `Duration.` This is in contrast
87//! with [`task::sleep_until`], which takes an `Instant`, and cannot be reset.
88//!
89//! # Cancellation
90//!
91//! You can use [`channel::bounded`] to create a [`channel::Sender`] and [`channel::Receiver`] pair.
92//! When the "sender" sends a message, all "receivers" will halt execution of the future the next time they are
93//! `.await`ed. This will cause the future to stop executing, and all
94//! destructors to be run.
95//!
96//! ```
97//! use futures_lite::prelude::*;
98//! use futures_time::prelude::*;
99//! use futures_time::channel;
100//! use futures_time::time::Duration;
101//!
102//! fn main() {
103//!     async_io::block_on(async {
104//!         let (send, mut recv) = channel::bounded::<()>(1); // create a new send/receive pair
105//!         let mut counter = 0;
106//!         let value = async { "meow" }
107//!             .delay(Duration::from_millis(100))
108//!             .timeout(recv.next()) // time-out when the sender emits a message
109//!             .await;
110//!
111//!         assert_eq!(value.unwrap(), "meow");
112//!     })
113//! }
114//! ```
115//!
116//! # Futures
117//!
118//! - [`Future::delay`](`future::FutureExt::delay`) Delay execution for a specified time.
119//! - [`Future::timeout`](`future::FutureExt::timeout`) Cancel the future if the execution takes longer than the specified time.
120//! - [`Future::park`](`future::FutureExt::park`) Suspend or resume the execution of a future.
121//!
122//! # Tasks
123//!
124//! - [`task::sleep_until`] Sleeps until the specified deadline.
125//! - [`task::sleep`] Sleeps for the specified amount of time.
126//!
127//! # Streams
128//!
129//! - [`Stream::buffer`](`stream::StreamExt::buffer`) Returns a stream which buffers items and flushes them at each interval.
130//! - [`Stream::debounce`](`stream::StreamExt::debounce`) Returns a stream that debounces for the given duration.
131//! - [`Stream::delay`](`stream::StreamExt::delay`) Delay execution for a specified time.
132//! - [`Future::park`](`future::StreamExt::park`) Suspend or resume the execution of a stream.
133//! - [`Stream::sample`](`stream::StreamExt::sample`) Yield the last value received, if any, at each interval.
134//! - [`Stream::throttle`](`stream::StreamExt::throttle`) Filter out all items after the first for a specified time.
135//! - [`Stream::timeout`](`stream::StreamExt::timeout`) Cancel the stream if the execution takes longer than the specified time.
136//! - [`stream::interval`](`stream::interval`) Creates a new stream that yields at a set interval.
137//!
138//! # Re-exports
139//!
140//! - `channel` is a re-export of the [`async-channel`] crate, exposed for convenience
141//!
142//! [`async-channel`]: https://docs.rs/async-channel/latest/async_channel
143
144#![forbid(unsafe_code)]
145#![deny(missing_debug_implementations)]
146#![warn(missing_docs, future_incompatible, unreachable_pub)]
147#![forbid(rustdoc::missing_doc_code_examples)]
148
149pub(crate) mod utils;
150
151pub mod future;
152pub mod stream;
153pub mod task;
154pub mod time;
155
156/// An async multi-producer multi-consumer channel.
157pub mod channel {
158    /// Suspend or resume execution of a future.
159    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
160    pub enum Parker {
161        /// Put the future into a suspended state.
162        Park,
163        /// Put the future into an active state.
164        Unpark,
165    }
166    #[doc(inline)]
167    pub use async_channel::*;
168}
169
170/// The `futures-time` prelude.
171pub mod prelude {
172    pub use super::future::FutureExt as _;
173    pub use super::future::IntoFuture as _;
174    pub use super::future::Timer as _;
175    pub use super::stream::IntoStream as _;
176    pub use super::stream::StreamExt as _;
177}