async_timer/
timed.rs

1//! Timed future
2
3use core::future::Future;
4use core::{fmt, task, time, mem};
5use core::pin::Pin;
6
7use crate::oneshot::Oneshot;
8use crate::oneshot::Timer as PlatformTimer;
9
10#[must_use = "Timed does nothing unless polled"]
11///Limiter on time to wait for underlying `Future`
12///
13///# Usage
14///
15///```rust, no_run
16///async fn job() {
17///}
18///
19///async fn do_job() {
20///    let work = unsafe {
21///        async_timer::Timed::platform_new_unchecked(job(), core::time::Duration::from_secs(1))
22///    };
23///
24///    match work.await {
25///        Ok(_) => println!("I'm done!"),
26///        //You can retry by polling `expired`
27///        Err(expired) => println!("Job expired: {}", expired),
28///    }
29///}
30///```
31pub enum Timed<F, T=PlatformTimer> {
32    #[doc(hidden)]
33    Ongoing(T, F, time::Duration),
34    #[doc(hidden)]
35    Stopped,
36}
37
38impl<F: Future + Unpin> Timed<F> {
39    #[inline]
40    ///Creates new instance using [Timer](../oneshot/type.Timer.html) alias.
41    pub fn platform_new(inner: F, timeout: time::Duration) -> Self {
42        Timed::<F, PlatformTimer>::new(inner, timeout)
43    }
44}
45
46impl<F: Future> Timed<F> {
47    #[inline]
48    ///Creates new instance using [Timer](../oneshot/type.Timer.html) alias.
49    ///
50    ///Unsafe version of `platform_new` that doesn't require `Unpin`.
51    pub unsafe fn platform_new_unchecked(inner: F, timeout: time::Duration) -> Self {
52        Timed::<F, PlatformTimer>::new_unchecked(inner, timeout)
53    }
54}
55
56impl<F: Future + Unpin, T: Oneshot> Timed<F, T> {
57    ///Creates new instance with specified timeout
58    ///
59    ///Requires to specify `Oneshot` type (e.g. `Timed::<oneshoot::Timer>::new()`)
60    pub fn new(inner: F, timeout: time::Duration) -> Self {
61        Timed::Ongoing(T::new(timeout), inner, timeout)
62    }
63}
64
65impl<F: Future, T: Oneshot> Timed<F, T> {
66    ///Creates new instance with specified timeout
67    ///
68    ///Unsafe version of `new` that doesn't require `Unpin`.
69    ///
70    ///Requires to specify `Oneshot` type (e.g. `Timed::<oneshoot::Timer>::new()`)
71    pub unsafe fn new_unchecked(inner: F, timeout: time::Duration) -> Self {
72        Timed::Ongoing(T::new(timeout), inner, timeout)
73    }
74}
75
76impl<F: Future, T: Oneshot> Future for Timed<F, T> {
77    type Output = Result<F::Output, Expired<F, T>>;
78
79    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
80        let mut state = Timed::Stopped;
81        let mut this = unsafe { self.get_unchecked_mut() };
82        mem::swap(&mut state, &mut this);
83
84        match state {
85            Timed::Ongoing(mut timer, mut future, timeout) => {
86                match Future::poll(unsafe { Pin::new_unchecked(&mut future) }, ctx) {
87                    task::Poll::Pending => (),
88                    task::Poll::Ready(result) => return task::Poll::Ready(Ok(result)),
89                }
90
91                match Future::poll(Pin::new(&mut timer), ctx) {
92                    task::Poll::Pending => (),
93                    task::Poll::Ready(_) => return task::Poll::Ready(Err(Expired {
94                        inner: Timed::Ongoing(timer, future, timeout),
95                    })),
96                }
97
98                *this = Timed::Ongoing(timer, future, timeout);
99                task::Poll::Pending
100            },
101            Timed::Stopped => task::Poll::Pending,
102        }
103    }
104}
105
106impl<F: Future + Unpin, T: Oneshot> Unpin for Timed<F, T> {}
107
108///Error when [Timed](struct.Timed.html) expires
109///
110///Implements `Future` that can be used to restart `Timed`
111///Note, that `Oneshot` starts execution immediately after resolving this Future
112pub struct Expired<F, T> {
113    inner: Timed<F, T>,
114}
115
116impl<F: Future, T: Oneshot> Expired<F, T> {
117    ///Returns underlying `Future`
118    pub fn into_inner(self) -> F {
119        match self.inner {
120            Timed::Ongoing(_, fut, _) => fut,
121            _ => unreach!(),
122        }
123    }
124}
125
126impl<F: Future, T: Oneshot> Future for Expired<F, T> {
127    type Output = Timed<F, T>;
128
129    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
130        let mut state = Timed::Stopped;
131        let this = unsafe { self.get_unchecked_mut() };
132        mem::swap(&mut this.inner, &mut state);
133
134        match state {
135            Timed::Ongoing(mut timer, future, timeout) => {
136                timer.restart(timeout, ctx.waker());
137
138                task::Poll::Ready(Timed::Ongoing(timer, future, timeout))
139            },
140            _ => task::Poll::Pending,
141        }
142    }
143}
144
145impl<F: Future + Unpin, T: Oneshot> Unpin for Expired<F, T> {}
146
147#[cfg(not(feature = "no_std"))]
148impl<F, T: Oneshot> crate::std::error::Error for Expired<F, T> {}
149impl<F, T: Oneshot> fmt::Debug for Expired<F, T> {
150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151        write!(f, "{}", self)
152    }
153}
154
155impl<F, T: Oneshot> fmt::Display for Expired<F, T> {
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        match self.inner {
158            Timed::Stopped => write!(f, "Future is being re-tried."),
159            Timed::Ongoing(_, _, timeout) => match timeout.as_secs() {
160                0 => write!(f, "Future expired in {} ms", timeout.as_millis()),
161                secs => write!(f, "Future expired in {} seconds and {} ms", secs, timeout.subsec_millis()),
162            },
163        }
164    }
165}