gloo_timers/
future.rs

1//! `Future`- and `Stream`-backed timers APIs.
2
3use crate::callback::{Interval, Timeout};
4
5use futures_channel::{mpsc, oneshot};
6use futures_core::stream::Stream;
7use std::convert::TryFrom;
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::time::Duration;
12use wasm_bindgen::prelude::*;
13
14/// A scheduled timeout as a `Future`.
15///
16/// See `TimeoutFuture::new` for scheduling new timeouts.
17///
18/// Once scheduled, if you change your mind and don't want the timeout to fire,
19/// you can `drop` the future.
20///
21/// A timeout future will never resolve to `Err`. Its only failure mode is when
22/// the timeout is so long that it is effectively infinite and never fires.
23///
24/// # Example
25///
26/// ```no_run
27/// use gloo_timers::future::TimeoutFuture;
28/// use futures_util::future::{select, Either};
29/// use wasm_bindgen_futures::spawn_local;
30///
31/// spawn_local(async {
32///     match select(TimeoutFuture::new(1_000), TimeoutFuture::new(2_000)).await {
33///         Either::Left((val, b)) => {
34///             // Drop the `2_000` ms timeout to cancel its timeout.
35///             drop(b);
36///         }
37///         Either::Right((a, val)) => {
38///             panic!("the `1_000` ms timeout should have won this race");
39///         }
40///     }
41/// });
42/// ```
43#[derive(Debug)]
44#[must_use = "futures do nothing unless polled or spawned"]
45pub struct TimeoutFuture {
46    _inner: Timeout,
47    rx: oneshot::Receiver<()>,
48}
49
50impl TimeoutFuture {
51    /// Create a new timeout future.
52    ///
53    /// Remember that futures do nothing unless polled or spawned, so either
54    /// pass this future to `wasm_bindgen_futures::spawn_local` or use it inside
55    /// another future.
56    ///
57    /// # Example
58    ///
59    /// ```no_run
60    /// use gloo_timers::future::TimeoutFuture;
61    /// use wasm_bindgen_futures::spawn_local;
62    ///
63    /// spawn_local(async {
64    ///     TimeoutFuture::new(1_000).await;
65    ///     // Do stuff after one second...
66    /// });
67    /// ```
68    pub fn new(millis: u32) -> TimeoutFuture {
69        let (tx, rx) = oneshot::channel();
70        let inner = Timeout::new(millis, move || {
71            // if the receiver was dropped we do nothing.
72            tx.send(()).unwrap_throw();
73        });
74        TimeoutFuture { _inner: inner, rx }
75    }
76}
77
78/// Waits until the specified duration has elapsed.
79///
80/// # Panics
81///
82/// This function will panic if the specified [`Duration`] cannot be casted into a u32 in
83/// milliseconds.
84///
85/// # Example
86///
87/// ```compile_fail
88/// use std::time::Duration;
89/// use gloo_timers::future::sleep;
90///
91/// sleep(Duration::from_secs(1)).await;
92/// ```
93pub fn sleep(dur: Duration) -> TimeoutFuture {
94    let millis = u32::try_from(dur.as_millis())
95        .expect_throw("failed to cast the duration into a u32 with Duration::as_millis.");
96
97    TimeoutFuture::new(millis)
98}
99
100impl Future for TimeoutFuture {
101    type Output = ();
102
103    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
104        Future::poll(Pin::new(&mut self.rx), cx).map(|t| t.unwrap_throw())
105    }
106}
107/// A scheduled interval as a `Stream`.
108///
109/// See `IntervalStream::new` for scheduling new intervals.
110///
111/// Once scheduled, if you want to stop the interval from continuing to fire,
112/// you can `drop` the stream.
113///
114/// An interval stream will never resolve to `Err`.
115#[derive(Debug)]
116#[must_use = "streams do nothing unless polled or spawned"]
117pub struct IntervalStream {
118    receiver: mpsc::UnboundedReceiver<()>,
119    _inner: Interval,
120}
121
122impl IntervalStream {
123    /// Create a new interval stream.
124    ///
125    /// Remember that streams do nothing unless polled or spawned, so either
126    /// spawn this stream via `wasm_bindgen_futures::spawn_local` or use it inside
127    /// another stream or future.
128    ///
129    /// # Example
130    ///
131    /// ```compile_fail
132    /// use futures_util::stream::StreamExt;
133    /// use gloo_timers::future::IntervalStream;
134    /// use wasm_bindgen_futures::spawn_local;
135    ///
136    /// spawn_local(async {
137    ///     IntervalStream::new(1_000).for_each(|_| {
138    ///         // Do stuff every one second...
139    ///     }).await;
140    /// });
141    /// ```
142    pub fn new(millis: u32) -> IntervalStream {
143        let (sender, receiver) = mpsc::unbounded();
144        let inner = Interval::new(millis, move || {
145            // if the receiver was dropped we do nothing.
146            sender.unbounded_send(()).unwrap_throw();
147        });
148
149        IntervalStream {
150            receiver,
151            _inner: inner,
152        }
153    }
154}
155
156impl Stream for IntervalStream {
157    type Item = ();
158
159    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
160        Stream::poll_next(Pin::new(&mut self.receiver), cx)
161    }
162}