1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
//! # Async time operators.
//!
//! This crate provides ergonomic, async time-based operations. It serves as an
//! experimental playground to experiment with how we could potentially add
//! time-based operations to `async-std`, and subsequently the stdlib.
//!
//! The goal is to make working with time and other events feel natural. A major
//! source of inspiration for this has been RxJS, which uses events (including
//! time) to trigger operations. This crate takes that principle, inverts the
//! model to make it evaluate lazily, and wraps it in an ergnomic Rust
//! interface.
//!
//! # Examples
//!
//! __Delay a future's execution by 100ms__
//!
//! ```
//! use futures_time::prelude::*;
//! use futures_time::time::Duration;
//!
//! fn main() {
//! async_io::block_on(async {
//! let res = async { "meow" }
//! .delay(Duration::from_millis(100))
//! .await;
//! assert_eq!(res, "meow");
//! })
//! }
//! ```
//!
//! __Error if a future takes longer than 200ms__
//!
//! ```
//! use futures_time::prelude::*;
//! use futures_time::time::Duration;
//!
//! fn main() {
//! async_io::block_on(async {
//! let res = async { "meow" }
//! .delay(Duration::from_millis(100))
//! .timeout(Duration::from_millis(200))
//! .await;
//! assert_eq!(res.unwrap(), "meow");
//! })
//! }
//! ```
//!
//! __Throttle a stream__
//!
//! This lets two items through in total: one `100ms` after the program has
//! started, and one `300ms` after the program has started.
//!
//! ```
//! use futures_lite::prelude::*;
//! use futures_time::prelude::*;
//! use futures_time::time::Duration;
//! use futures_time::stream;
//!
//! fn main() {
//! async_io::block_on(async {
//! let mut counter = 0;
//! stream::interval(Duration::from_millis(100)) // Yield an item every 100ms
//! .take(4) // Stop after 4 items
//! .throttle(Duration::from_millis(300)) // Only let an item through every 300ms
//! .for_each(|_| counter += 1) // Increment a counter for each item
//! .await;
//!
//! assert_eq!(counter, 2);
//! })
//! }
//! ```
//!
//! # The `Timer` trait
//!
//! The future returned by [`task::sleep`] implements the [`future::Timer`]
//! trait. This represents a future whose deadline can be moved forward into the
//! future.
//!
//! For example, say we have a deadline of `Duration::from_secs(10)`. By calling
//! `Timer::reset_timer` the timer can be reschedule to trigger at a later time.
//! This functionality is required for methods such as `debounce` and
//! `Stream::timeout`, which will regularly want to reschedule their timers to trigger
//! the future.
//!
//! Currently the only type implementing the `Timer` trait is
//! [`task::Sleep`], which is created from a `Duration.` This is in contrast
//! with [`task::sleep_until`], which takes an `Instant`, and cannot be reset.
//!
//! # Cancellation
//!
//! You can use [`channel::bounded`] to create a [`channel::Sender`] and [`channel::Receiver`] pair.
//! When the "sender" sends a message, all "receivers" will halt execution of the future the next time they are
//! `.await`ed. This will cause the future to stop executing, and all
//! destructors to be run.
//!
//! ```
//! use futures_lite::prelude::*;
//! use futures_time::prelude::*;
//! use futures_time::channel;
//! use futures_time::time::Duration;
//!
//! fn main() {
//! async_io::block_on(async {
//! let (send, mut recv) = channel::bounded::<()>(1); // create a new send/receive pair
//! let mut counter = 0;
//! let value = async { "meow" }
//! .delay(Duration::from_millis(100))
//! .timeout(recv.next()) // time-out when the sender emits a message
//! .await;
//!
//! assert_eq!(value.unwrap(), "meow");
//! })
//! }
//! ```
//!
//! # Futures
//!
//! - [`Future::delay`](`future::FutureExt::delay`) Delay execution for a specified time.
//! - [`Future::timeout`](`future::FutureExt::timeout`) Cancel the future if the execution takes longer than the specified time.
//! - [`Future::park`](`future::FutureExt::park`) Suspend or resume the execution of a future.
//!
//! # Tasks
//!
//! - [`task::sleep_until`] Sleeps until the specified deadline.
//! - [`task::sleep`] Sleeps for the specified amount of time.
//!
//! # Streams
//!
//! - [`Stream::buffer`](`stream::StreamExt::buffer`) Returns a stream which buffers items and flushes them at each interval.
//! - [`Stream::debounce`](`stream::StreamExt::debounce`) Returns a stream that debounces for the given duration.
//! - [`Stream::delay`](`stream::StreamExt::delay`) Delay execution for a specified time.
//! - [`Future::park`](`future::StreamExt::park`) Suspend or resume the execution of a stream.
//! - [`Stream::sample`](`stream::StreamExt::sample`) Yield the last value received, if any, at each interval.
//! - [`Stream::throttle`](`stream::StreamExt::throttle`) Filter out all items after the first for a specified time.
//! - [`Stream::timeout`](`stream::StreamExt::timeout`) Cancel the stream if the execution takes longer than the specified time.
//! - [`stream::interval`](`stream::interval`) Creates a new stream that yields at a set interval.
//!
//! # Re-exports
//!
//! - `channel` is a re-export of the [`async-channel`] crate, exposed for convenience
//!
//! [`async-channel`]: https://docs.rs/async-channel/latest/async_channel
#![forbid(unsafe_code)]
#![deny(missing_debug_implementations)]
#![warn(missing_docs, future_incompatible, unreachable_pub)]
#![forbid(rustdoc::missing_doc_code_examples)]
pub(crate) mod utils;
pub mod future;
pub mod stream;
pub mod task;
pub mod time;
/// An async multi-producer multi-consumer channel.
pub mod channel {
/// Suspend or resume execution of a future.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Parker {
/// Put the future into a suspended state.
Park,
/// Put the future into an active state.
Unpark,
}
#[doc(inline)]
pub use async_channel::*;
}
/// The `futures-time` prelude.
pub mod prelude {
pub use super::future::FutureExt as _;
pub use super::future::IntoFuture as _;
pub use super::future::Timer as _;
pub use super::stream::IntoStream as _;
pub use super::stream::StreamExt as _;
}