1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
//! `Future`- and `Stream`-backed timers APIs.
use crate::callback::{Interval, Timeout};
use futures_channel::{mpsc, oneshot};
use futures_core::stream::Stream;
use std::convert::TryFrom;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use wasm_bindgen::prelude::*;
/// A scheduled timeout as a `Future`.
///
/// See `TimeoutFuture::new` for scheduling new timeouts.
///
/// Once scheduled, if you change your mind and don't want the timeout to fire,
/// you can `drop` the future.
///
/// A timeout future will never resolve to `Err`. Its only failure mode is when
/// the timeout is so long that it is effectively infinite and never fires.
///
/// # Example
///
/// ```no_run
/// use gloo_timers::future::TimeoutFuture;
/// use futures_util::future::{select, Either};
/// use wasm_bindgen_futures::spawn_local;
///
/// spawn_local(async {
/// match select(TimeoutFuture::new(1_000), TimeoutFuture::new(2_000)).await {
/// Either::Left((val, b)) => {
/// // Drop the `2_000` ms timeout to cancel its timeout.
/// drop(b);
/// }
/// Either::Right((a, val)) => {
/// panic!("the `1_000` ms timeout should have won this race");
/// }
/// }
/// });
/// ```
#[derive(Debug)]
#[must_use = "futures do nothing unless polled or spawned"]
pub struct TimeoutFuture {
_inner: Timeout,
rx: oneshot::Receiver<()>,
}
impl TimeoutFuture {
/// Create a new timeout future.
///
/// Remember that futures do nothing unless polled or spawned, so either
/// pass this future to `wasm_bindgen_futures::spawn_local` or use it inside
/// another future.
///
/// # Example
///
/// ```no_run
/// use gloo_timers::future::TimeoutFuture;
/// use wasm_bindgen_futures::spawn_local;
///
/// spawn_local(async {
/// TimeoutFuture::new(1_000).await;
/// // Do stuff after one second...
/// });
/// ```
pub fn new(millis: u32) -> TimeoutFuture {
let (tx, rx) = oneshot::channel();
let inner = Timeout::new(millis, move || {
// if the receiver was dropped we do nothing.
tx.send(()).unwrap_throw();
});
TimeoutFuture { _inner: inner, rx }
}
}
/// Waits until the specified duration has elapsed.
///
/// # Panics
///
/// This function will panic if the specified [`Duration`] cannot be casted into a u32 in
/// milliseconds.
///
/// # Example
///
/// ```compile_fail
/// use std::time::Duration;
/// use gloo_timers::future::sleep;
///
/// sleep(Duration::from_secs(1)).await;
/// ```
pub fn sleep(dur: Duration) -> TimeoutFuture {
let millis = u32::try_from(dur.as_millis())
.expect_throw("failed to cast the duration into a u32 with Duration::as_millis.");
TimeoutFuture::new(millis)
}
impl Future for TimeoutFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Future::poll(Pin::new(&mut self.rx), cx).map(|t| t.unwrap_throw())
}
}
/// A scheduled interval as a `Stream`.
///
/// See `IntervalStream::new` for scheduling new intervals.
///
/// Once scheduled, if you want to stop the interval from continuing to fire,
/// you can `drop` the stream.
///
/// An interval stream will never resolve to `Err`.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled or spawned"]
pub struct IntervalStream {
receiver: mpsc::UnboundedReceiver<()>,
_inner: Interval,
}
impl IntervalStream {
/// Create a new interval stream.
///
/// Remember that streams do nothing unless polled or spawned, so either
/// spawn this stream via `wasm_bindgen_futures::spawn_local` or use it inside
/// another stream or future.
///
/// # Example
///
/// ```compile_fail
/// use futures_util::stream::StreamExt;
/// use gloo_timers::future::IntervalStream;
/// use wasm_bindgen_futures::spawn_local;
///
/// spawn_local(async {
/// IntervalStream::new(1_000).for_each(|_| {
/// // Do stuff every one second...
/// }).await;
/// });
/// ```
pub fn new(millis: u32) -> IntervalStream {
let (sender, receiver) = mpsc::unbounded();
let inner = Interval::new(millis, move || {
// if the receiver was dropped we do nothing.
sender.unbounded_send(()).unwrap_throw();
});
IntervalStream {
receiver,
_inner: inner,
}
}
}
impl Stream for IntervalStream {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Stream::poll_next(Pin::new(&mut self.receiver), cx)
}
}