gloo_timers/future.rs
1//! `Future`- and `Stream`-backed timers APIs.
2
3use crate::callback::{Interval, Timeout};
4
5use futures_channel::{mpsc, oneshot};
6use futures_core::stream::Stream;
7use std::convert::TryFrom;
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::time::Duration;
12use wasm_bindgen::prelude::*;
13
14/// A scheduled timeout as a `Future`.
15///
16/// See `TimeoutFuture::new` for scheduling new timeouts.
17///
18/// Once scheduled, if you change your mind and don't want the timeout to fire,
19/// you can `drop` the future.
20///
21/// A timeout future will never resolve to `Err`. Its only failure mode is when
22/// the timeout is so long that it is effectively infinite and never fires.
23///
24/// # Example
25///
26/// ```no_run
27/// use gloo_timers::future::TimeoutFuture;
28/// use futures_util::future::{select, Either};
29/// use wasm_bindgen_futures::spawn_local;
30///
31/// spawn_local(async {
32/// match select(TimeoutFuture::new(1_000), TimeoutFuture::new(2_000)).await {
33/// Either::Left((val, b)) => {
34/// // Drop the `2_000` ms timeout to cancel its timeout.
35/// drop(b);
36/// }
37/// Either::Right((a, val)) => {
38/// panic!("the `1_000` ms timeout should have won this race");
39/// }
40/// }
41/// });
42/// ```
43#[derive(Debug)]
44#[must_use = "futures do nothing unless polled or spawned"]
45pub struct TimeoutFuture {
46 _inner: Timeout,
47 rx: oneshot::Receiver<()>,
48}
49
50impl TimeoutFuture {
51 /// Create a new timeout future.
52 ///
53 /// Remember that futures do nothing unless polled or spawned, so either
54 /// pass this future to `wasm_bindgen_futures::spawn_local` or use it inside
55 /// another future.
56 ///
57 /// # Example
58 ///
59 /// ```no_run
60 /// use gloo_timers::future::TimeoutFuture;
61 /// use wasm_bindgen_futures::spawn_local;
62 ///
63 /// spawn_local(async {
64 /// TimeoutFuture::new(1_000).await;
65 /// // Do stuff after one second...
66 /// });
67 /// ```
68 pub fn new(millis: u32) -> TimeoutFuture {
69 let (tx, rx) = oneshot::channel();
70 let inner = Timeout::new(millis, move || {
71 // if the receiver was dropped we do nothing.
72 tx.send(()).unwrap_throw();
73 });
74 TimeoutFuture { _inner: inner, rx }
75 }
76}
77
78/// Waits until the specified duration has elapsed.
79///
80/// # Panics
81///
82/// This function will panic if the specified [`Duration`] cannot be casted into a u32 in
83/// milliseconds.
84///
85/// # Example
86///
87/// ```compile_fail
88/// use std::time::Duration;
89/// use gloo_timers::future::sleep;
90///
91/// sleep(Duration::from_secs(1)).await;
92/// ```
93pub fn sleep(dur: Duration) -> TimeoutFuture {
94 let millis = u32::try_from(dur.as_millis())
95 .expect_throw("failed to cast the duration into a u32 with Duration::as_millis.");
96
97 TimeoutFuture::new(millis)
98}
99
100impl Future for TimeoutFuture {
101 type Output = ();
102
103 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
104 Future::poll(Pin::new(&mut self.rx), cx).map(|t| t.unwrap_throw())
105 }
106}
107/// A scheduled interval as a `Stream`.
108///
109/// See `IntervalStream::new` for scheduling new intervals.
110///
111/// Once scheduled, if you want to stop the interval from continuing to fire,
112/// you can `drop` the stream.
113///
114/// An interval stream will never resolve to `Err`.
115#[derive(Debug)]
116#[must_use = "streams do nothing unless polled or spawned"]
117pub struct IntervalStream {
118 receiver: mpsc::UnboundedReceiver<()>,
119 _inner: Interval,
120}
121
122impl IntervalStream {
123 /// Create a new interval stream.
124 ///
125 /// Remember that streams do nothing unless polled or spawned, so either
126 /// spawn this stream via `wasm_bindgen_futures::spawn_local` or use it inside
127 /// another stream or future.
128 ///
129 /// # Example
130 ///
131 /// ```compile_fail
132 /// use futures_util::stream::StreamExt;
133 /// use gloo_timers::future::IntervalStream;
134 /// use wasm_bindgen_futures::spawn_local;
135 ///
136 /// spawn_local(async {
137 /// IntervalStream::new(1_000).for_each(|_| {
138 /// // Do stuff every one second...
139 /// }).await;
140 /// });
141 /// ```
142 pub fn new(millis: u32) -> IntervalStream {
143 let (sender, receiver) = mpsc::unbounded();
144 let inner = Interval::new(millis, move || {
145 // if the receiver was dropped we do nothing.
146 sender.unbounded_send(()).unwrap_throw();
147 });
148
149 IntervalStream {
150 receiver,
151 _inner: inner,
152 }
153 }
154}
155
156impl Stream for IntervalStream {
157 type Item = ();
158
159 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
160 Stream::poll_next(Pin::new(&mut self.receiver), cx)
161 }
162}