futures_intrusive/timer/
timer.rs

1//! An asynchronously awaitable timer
2
3use super::clock::Clock;
4use crate::{
5    intrusive_pairing_heap::{HeapNode, PairingHeap},
6    utils::update_waker_ref,
7    NoopLock,
8};
9use core::{pin::Pin, time::Duration};
10use futures_core::{
11    future::{FusedFuture, Future},
12    task::{Context, Poll, Waker},
13};
14use lock_api::{Mutex, RawMutex};
15
16/// Tracks how the future had interacted with the timer
17#[derive(PartialEq)]
18enum PollState {
19    /// The task is not registered at the wait queue at the timer
20    Unregistered,
21    /// The task was added to the wait queue at the timer
22    Registered,
23    /// The timer has expired and was thereby removed from the wait queue at
24    /// the timer. Having this extra state avoids to query the clock for an
25    /// extra time.
26    Expired,
27}
28
29/// Tracks the timer futures waiting state.
30struct TimerQueueEntry {
31    /// Timestamp when the timer expires
32    expiry: u64,
33    /// The task handle of the waiting task
34    task: Option<Waker>,
35    /// Current polling state
36    state: PollState,
37}
38
39impl TimerQueueEntry {
40    /// Creates a new TimerQueueEntry
41    fn new(expiry: u64) -> TimerQueueEntry {
42        TimerQueueEntry {
43            expiry,
44            task: None,
45            state: PollState::Unregistered,
46        }
47    }
48}
49
50impl PartialEq for TimerQueueEntry {
51    fn eq(&self, other: &TimerQueueEntry) -> bool {
52        // This is technically not correct. However for the usage in this module
53        // we only need to compare timers by expiration.
54        self.expiry == other.expiry
55    }
56}
57
58impl Eq for TimerQueueEntry {}
59
60impl PartialOrd for TimerQueueEntry {
61    fn partial_cmp(
62        &self,
63        other: &TimerQueueEntry,
64    ) -> Option<core::cmp::Ordering> {
65        // Compare timer queue entries by expiration time
66        self.expiry.partial_cmp(&other.expiry)
67    }
68}
69
70impl Ord for TimerQueueEntry {
71    fn cmp(&self, other: &TimerQueueEntry) -> core::cmp::Ordering {
72        self.expiry.cmp(&other.expiry)
73    }
74}
75
76/// Internal state of the timer
77struct TimerState {
78    /// The clock which is utilized
79    clock: &'static dyn Clock,
80    /// The heap of waiters, which are waiting for their timer to expire
81    waiters: PairingHeap<TimerQueueEntry>,
82}
83
84impl TimerState {
85    fn new(clock: &'static dyn Clock) -> TimerState {
86        TimerState {
87            clock,
88            waiters: PairingHeap::new(),
89        }
90    }
91
92    /// Registers the timer future at the Timer.
93    /// This function is only safe as long as the `wait_node`s address is guaranteed
94    /// to be stable until it gets removed from the queue.
95    unsafe fn try_wait(
96        &mut self,
97        wait_node: &mut HeapNode<TimerQueueEntry>,
98        cx: &mut Context<'_>,
99    ) -> Poll<()> {
100        match wait_node.state {
101            PollState::Unregistered => {
102                let now = self.clock.now();
103                if now >= wait_node.expiry {
104                    // The timer is already expired
105                    wait_node.state = PollState::Expired;
106                    Poll::Ready(())
107                } else {
108                    // Added the task to the wait queue
109                    wait_node.task = Some(cx.waker().clone());
110                    wait_node.state = PollState::Registered;
111                    self.waiters.insert(wait_node);
112                    Poll::Pending
113                }
114            }
115            PollState::Registered => {
116                // Since the timer wakes up all waiters and moves their states to
117                // Expired when the timer expired, it can't be expired here yet.
118                // However the caller might have passed a different `Waker`.
119                // In this case we need to update it.
120                update_waker_ref(&mut wait_node.task, cx);
121                Poll::Pending
122            }
123            PollState::Expired => Poll::Ready(()),
124        }
125    }
126
127    fn remove_waiter(&mut self, wait_node: &mut HeapNode<TimerQueueEntry>) {
128        // TimerFuture only needs to get removed if it had been added to
129        // the wait queue of the timer. This has happened in the PollState::Registered case.
130        if let PollState::Registered = wait_node.state {
131            // Safety: Due to the state, we know that the node must be part
132            // of the waiter heap
133            unsafe { self.waiters.remove(wait_node) };
134            wait_node.state = PollState::Unregistered;
135        }
136    }
137
138    /// Returns a timestamp when the next timer expires.
139    ///
140    /// For thread-safe timers, the returned value is not precise and subject to
141    /// race-conditions, since other threads can add timer in the meantime.
142    fn next_expiration(&self) -> Option<u64> {
143        // Safety: We ensure that any node in the heap remains alive
144        unsafe { self.waiters.peek_min().map(|first| first.as_ref().expiry) }
145    }
146
147    /// Checks whether any of the attached Futures is expired
148    fn check_expirations(&mut self) {
149        let now = self.clock.now();
150        while let Some(mut first) = self.waiters.peek_min() {
151            // Safety: We ensure that any node in the heap remains alive
152            unsafe {
153                let entry = first.as_mut();
154                let first_expiry = entry.expiry;
155                if now >= first_expiry {
156                    // The timer is expired.
157                    entry.state = PollState::Expired;
158                    if let Some(task) = entry.task.take() {
159                        task.wake();
160                    }
161                } else {
162                    // Remaining timers are not expired
163                    break;
164                }
165
166                // Remove the expired timer
167                self.waiters.remove(entry);
168            }
169        }
170    }
171}
172
173/// Adapter trait that allows Futures to generically interact with timer
174/// implementations via dynamic dispatch.
175trait TimerAccess {
176    unsafe fn try_wait(
177        &self,
178        wait_node: &mut HeapNode<TimerQueueEntry>,
179        cx: &mut Context<'_>,
180    ) -> Poll<()>;
181
182    fn remove_waiter(&self, wait_node: &mut HeapNode<TimerQueueEntry>);
183}
184
185/// An asynchronously awaitable timer which is bound to a thread.
186///
187/// The timer operates on millisecond precision and makes use of a configurable
188/// clock source.
189///
190/// The timer allows to wait asynchronously either for a certain duration,
191/// or until the provided [`Clock`] reaches a certain timestamp.
192pub trait LocalTimer {
193    /// Returns a future that gets fulfilled after the given `Duration`
194    fn delay(&self, delay: Duration) -> LocalTimerFuture;
195
196    /// Returns a future that gets fulfilled when the utilized [`Clock`] reaches
197    /// the given timestamp.
198    fn deadline(&self, timestamp: u64) -> LocalTimerFuture;
199}
200
201/// An asynchronously awaitable thread-safe timer.
202///
203/// The timer operates on millisecond precision and makes use of a configurable
204/// clock source.
205///
206/// The timer allows to wait asynchronously either for a certain duration,
207/// or until the provided [`Clock`] reaches a certain timestamp.
208pub trait Timer {
209    /// Returns a future that gets fulfilled after the given `Duration`
210    fn delay(&self, delay: Duration) -> TimerFuture;
211
212    /// Returns a future that gets fulfilled when the utilized [`Clock`] reaches
213    /// the given timestamp.
214    fn deadline(&self, timestamp: u64) -> TimerFuture;
215}
216
217/// An asynchronously awaitable timer.
218///
219/// The timer operates on millisecond precision and makes use of a configurable
220/// clock source.
221///
222/// The timer allows to wait asynchronously either for a certain duration,
223/// or until the provided [`Clock`] reaches a certain timestamp.
224///
225/// In order to unblock tasks that are waiting on the timer,
226/// [`check_expirations`](GenericTimerService::check_expirations)
227/// must be called in regular intervals on this timer service.
228///
229/// The timer can either be running on a separate timer thread (in case a
230/// thread-safe timer type is utilize), or it can be integrated into an executor
231/// in order to minimize context switches.
232pub struct GenericTimerService<MutexType: RawMutex> {
233    inner: Mutex<MutexType, TimerState>,
234}
235
236// The timer can be sent to other threads as long as it's not borrowed
237unsafe impl<MutexType: RawMutex + Send> Send
238    for GenericTimerService<MutexType>
239{
240}
241// The timer is thread-safe as long as it uses a thread-safe mutex
242unsafe impl<MutexType: RawMutex + Sync> Sync
243    for GenericTimerService<MutexType>
244{
245}
246
247impl<MutexType: RawMutex> core::fmt::Debug for GenericTimerService<MutexType> {
248    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
249        f.debug_struct("TimerService").finish()
250    }
251}
252
253impl<MutexType: RawMutex> GenericTimerService<MutexType> {
254    /// Creates a new Timer in the given state.
255    ///
256    /// The Timer will query the provided [`Clock`] instance for the current
257    /// time whenever required.
258    ///
259    /// In order to create a create a clock which utilizes system time,
260    /// [`StdClock`](super::StdClock) can be utilized.
261    /// In order to simulate time for test purposes,
262    /// [`MockClock`](super::MockClock) can be utilized.
263    pub fn new(clock: &'static dyn Clock) -> GenericTimerService<MutexType> {
264        GenericTimerService::<MutexType> {
265            inner: Mutex::new(TimerState::new(clock)),
266        }
267    }
268
269    /// Returns a timestamp when the next timer expires.
270    ///
271    /// For thread-safe timers, the returned value is not precise and subject to
272    /// race-conditions, since other threads can add timer in the meantime.
273    ///
274    /// Therefore adding any timer to the [`GenericTimerService`] should  also
275    /// make sure to wake up the executor which polls for timeouts, in order to
276    /// let it capture the latest change.
277    pub fn next_expiration(&self) -> Option<u64> {
278        self.inner.lock().next_expiration()
279    }
280
281    /// Checks whether any of the attached [`TimerFuture`]s has expired.
282    /// In this case the associated task is woken up.
283    pub fn check_expirations(&self) {
284        self.inner.lock().check_expirations()
285    }
286
287    /// Returns a deadline based on the current timestamp plus the given Duration
288    fn deadline_from_now(&self, duration: Duration) -> u64 {
289        let now = self.inner.lock().clock.now();
290        let duration_ms =
291            core::cmp::min(duration.as_millis(), core::u64::MAX as u128) as u64;
292        now.saturating_add(duration_ms)
293    }
294}
295
296impl<MutexType: RawMutex> LocalTimer for GenericTimerService<MutexType> {
297    /// Returns a future that gets fulfilled after the given [`Duration`]
298    fn delay(&self, delay: Duration) -> LocalTimerFuture {
299        let deadline = self.deadline_from_now(delay);
300        LocalTimer::deadline(&*self, deadline)
301    }
302
303    /// Returns a future that gets fulfilled when the utilized [`Clock`] reaches
304    /// the given timestamp.
305    fn deadline(&self, timestamp: u64) -> LocalTimerFuture {
306        LocalTimerFuture {
307            timer: Some(self),
308            wait_node: HeapNode::new(TimerQueueEntry::new(timestamp)),
309        }
310    }
311}
312
313impl<MutexType: RawMutex> Timer for GenericTimerService<MutexType>
314where
315    MutexType: Sync,
316{
317    /// Returns a future that gets fulfilled after the given [`Duration`]
318    fn delay(&self, delay: Duration) -> TimerFuture {
319        let deadline = self.deadline_from_now(delay);
320        Timer::deadline(&*self, deadline)
321    }
322
323    /// Returns a future that gets fulfilled when the utilized [`Clock`] reaches
324    /// the given timestamp.
325    fn deadline(&self, timestamp: u64) -> TimerFuture {
326        TimerFuture {
327            timer_future: LocalTimerFuture {
328                timer: Some(self),
329                wait_node: HeapNode::new(TimerQueueEntry::new(timestamp)),
330            },
331        }
332    }
333}
334
335impl<MutexType: RawMutex> TimerAccess for GenericTimerService<MutexType> {
336    unsafe fn try_wait(
337        &self,
338        wait_node: &mut HeapNode<TimerQueueEntry>,
339        cx: &mut Context<'_>,
340    ) -> Poll<()> {
341        self.inner.lock().try_wait(wait_node, cx)
342    }
343
344    fn remove_waiter(&self, wait_node: &mut HeapNode<TimerQueueEntry>) {
345        self.inner.lock().remove_waiter(wait_node)
346    }
347}
348
349/// A Future that is resolved once the requested time has elapsed.
350#[must_use = "futures do nothing unless polled"]
351pub struct LocalTimerFuture<'a> {
352    /// The Timer that is associated with this TimerFuture
353    timer: Option<&'a dyn TimerAccess>,
354    /// Node for waiting on the timer
355    wait_node: HeapNode<TimerQueueEntry>,
356}
357
358impl<'a> core::fmt::Debug for LocalTimerFuture<'a> {
359    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
360        f.debug_struct("LocalTimerFuture").finish()
361    }
362}
363
364impl<'a> Future for LocalTimerFuture<'a> {
365    type Output = ();
366
367    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
368        // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
369        // However this didn't seem to work for some borrow checker reasons
370
371        // Safety: The next operations are safe, because Pin promises us that
372        // the address of the wait queue entry inside TimerFuture is stable,
373        // and we don't move any fields inside the future until it gets dropped.
374        let mut_self: &mut LocalTimerFuture =
375            unsafe { Pin::get_unchecked_mut(self) };
376
377        let timer =
378            mut_self.timer.expect("polled TimerFuture after completion");
379
380        let poll_res = unsafe { timer.try_wait(&mut mut_self.wait_node, cx) };
381
382        if poll_res.is_ready() {
383            // A value was available
384            mut_self.timer = None;
385        }
386
387        poll_res
388    }
389}
390
391impl<'a> FusedFuture for LocalTimerFuture<'a> {
392    fn is_terminated(&self) -> bool {
393        self.timer.is_none()
394    }
395}
396
397impl<'a> Drop for LocalTimerFuture<'a> {
398    fn drop(&mut self) {
399        // If this TimerFuture has been polled and it was added to the
400        // wait queue at the timer, it must be removed before dropping.
401        // Otherwise the timer would access invalid memory.
402        if let Some(timer) = self.timer {
403            timer.remove_waiter(&mut self.wait_node);
404        }
405    }
406}
407
408/// A Future that is resolved once the requested time has elapsed.
409#[must_use = "futures do nothing unless polled"]
410pub struct TimerFuture<'a> {
411    /// The Timer that is associated with this TimerFuture
412    timer_future: LocalTimerFuture<'a>,
413}
414
415// Safety: TimerFutures are only returned by GenericTimerService instances which
416// are thread-safe (RawMutex: Sync).
417unsafe impl<'a> Send for TimerFuture<'a> {}
418
419impl<'a> core::fmt::Debug for TimerFuture<'a> {
420    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
421        f.debug_struct("TimerFuture").finish()
422    }
423}
424
425impl<'a> Future for TimerFuture<'a> {
426    type Output = ();
427
428    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
429        // Safety: TimerFuture is a pure wrapper around LocalTimerFuture.
430        // The inner value is never moved
431        let inner_pin = unsafe {
432            Pin::map_unchecked_mut(self, |fut| &mut fut.timer_future)
433        };
434        inner_pin.poll(cx)
435    }
436}
437
438impl<'a> FusedFuture for TimerFuture<'a> {
439    fn is_terminated(&self) -> bool {
440        self.timer_future.is_terminated()
441    }
442}
443
444// Export a non thread-safe version using NoopLock
445
446/// A [`GenericTimerService`] implementation which is not thread-safe.
447pub type LocalTimerService = GenericTimerService<NoopLock>;
448
449#[cfg(feature = "std")]
450mod if_std {
451    use super::*;
452
453    // Export a thread-safe version using parking_lot::RawMutex
454
455    /// A [`GenericTimerService`] implementation backed by [`parking_lot`].
456    pub type TimerService = GenericTimerService<parking_lot::RawMutex>;
457}
458
459#[cfg(feature = "std")]
460pub use self::if_std::*;