futures_intrusive/timer/timer.rs
1//! An asynchronously awaitable timer
2
3use super::clock::Clock;
4use crate::{
5 intrusive_pairing_heap::{HeapNode, PairingHeap},
6 utils::update_waker_ref,
7 NoopLock,
8};
9use core::{pin::Pin, time::Duration};
10use futures_core::{
11 future::{FusedFuture, Future},
12 task::{Context, Poll, Waker},
13};
14use lock_api::{Mutex, RawMutex};
15
16/// Tracks how the future had interacted with the timer
17#[derive(PartialEq)]
18enum PollState {
19 /// The task is not registered at the wait queue at the timer
20 Unregistered,
21 /// The task was added to the wait queue at the timer
22 Registered,
23 /// The timer has expired and was thereby removed from the wait queue at
24 /// the timer. Having this extra state avoids to query the clock for an
25 /// extra time.
26 Expired,
27}
28
29/// Tracks the timer futures waiting state.
30struct TimerQueueEntry {
31 /// Timestamp when the timer expires
32 expiry: u64,
33 /// The task handle of the waiting task
34 task: Option<Waker>,
35 /// Current polling state
36 state: PollState,
37}
38
39impl TimerQueueEntry {
40 /// Creates a new TimerQueueEntry
41 fn new(expiry: u64) -> TimerQueueEntry {
42 TimerQueueEntry {
43 expiry,
44 task: None,
45 state: PollState::Unregistered,
46 }
47 }
48}
49
50impl PartialEq for TimerQueueEntry {
51 fn eq(&self, other: &TimerQueueEntry) -> bool {
52 // This is technically not correct. However for the usage in this module
53 // we only need to compare timers by expiration.
54 self.expiry == other.expiry
55 }
56}
57
58impl Eq for TimerQueueEntry {}
59
60impl PartialOrd for TimerQueueEntry {
61 fn partial_cmp(
62 &self,
63 other: &TimerQueueEntry,
64 ) -> Option<core::cmp::Ordering> {
65 // Compare timer queue entries by expiration time
66 self.expiry.partial_cmp(&other.expiry)
67 }
68}
69
70impl Ord for TimerQueueEntry {
71 fn cmp(&self, other: &TimerQueueEntry) -> core::cmp::Ordering {
72 self.expiry.cmp(&other.expiry)
73 }
74}
75
76/// Internal state of the timer
77struct TimerState {
78 /// The clock which is utilized
79 clock: &'static dyn Clock,
80 /// The heap of waiters, which are waiting for their timer to expire
81 waiters: PairingHeap<TimerQueueEntry>,
82}
83
84impl TimerState {
85 fn new(clock: &'static dyn Clock) -> TimerState {
86 TimerState {
87 clock,
88 waiters: PairingHeap::new(),
89 }
90 }
91
92 /// Registers the timer future at the Timer.
93 /// This function is only safe as long as the `wait_node`s address is guaranteed
94 /// to be stable until it gets removed from the queue.
95 unsafe fn try_wait(
96 &mut self,
97 wait_node: &mut HeapNode<TimerQueueEntry>,
98 cx: &mut Context<'_>,
99 ) -> Poll<()> {
100 match wait_node.state {
101 PollState::Unregistered => {
102 let now = self.clock.now();
103 if now >= wait_node.expiry {
104 // The timer is already expired
105 wait_node.state = PollState::Expired;
106 Poll::Ready(())
107 } else {
108 // Added the task to the wait queue
109 wait_node.task = Some(cx.waker().clone());
110 wait_node.state = PollState::Registered;
111 self.waiters.insert(wait_node);
112 Poll::Pending
113 }
114 }
115 PollState::Registered => {
116 // Since the timer wakes up all waiters and moves their states to
117 // Expired when the timer expired, it can't be expired here yet.
118 // However the caller might have passed a different `Waker`.
119 // In this case we need to update it.
120 update_waker_ref(&mut wait_node.task, cx);
121 Poll::Pending
122 }
123 PollState::Expired => Poll::Ready(()),
124 }
125 }
126
127 fn remove_waiter(&mut self, wait_node: &mut HeapNode<TimerQueueEntry>) {
128 // TimerFuture only needs to get removed if it had been added to
129 // the wait queue of the timer. This has happened in the PollState::Registered case.
130 if let PollState::Registered = wait_node.state {
131 // Safety: Due to the state, we know that the node must be part
132 // of the waiter heap
133 unsafe { self.waiters.remove(wait_node) };
134 wait_node.state = PollState::Unregistered;
135 }
136 }
137
138 /// Returns a timestamp when the next timer expires.
139 ///
140 /// For thread-safe timers, the returned value is not precise and subject to
141 /// race-conditions, since other threads can add timer in the meantime.
142 fn next_expiration(&self) -> Option<u64> {
143 // Safety: We ensure that any node in the heap remains alive
144 unsafe { self.waiters.peek_min().map(|first| first.as_ref().expiry) }
145 }
146
147 /// Checks whether any of the attached Futures is expired
148 fn check_expirations(&mut self) {
149 let now = self.clock.now();
150 while let Some(mut first) = self.waiters.peek_min() {
151 // Safety: We ensure that any node in the heap remains alive
152 unsafe {
153 let entry = first.as_mut();
154 let first_expiry = entry.expiry;
155 if now >= first_expiry {
156 // The timer is expired.
157 entry.state = PollState::Expired;
158 if let Some(task) = entry.task.take() {
159 task.wake();
160 }
161 } else {
162 // Remaining timers are not expired
163 break;
164 }
165
166 // Remove the expired timer
167 self.waiters.remove(entry);
168 }
169 }
170 }
171}
172
173/// Adapter trait that allows Futures to generically interact with timer
174/// implementations via dynamic dispatch.
175trait TimerAccess {
176 unsafe fn try_wait(
177 &self,
178 wait_node: &mut HeapNode<TimerQueueEntry>,
179 cx: &mut Context<'_>,
180 ) -> Poll<()>;
181
182 fn remove_waiter(&self, wait_node: &mut HeapNode<TimerQueueEntry>);
183}
184
185/// An asynchronously awaitable timer which is bound to a thread.
186///
187/// The timer operates on millisecond precision and makes use of a configurable
188/// clock source.
189///
190/// The timer allows to wait asynchronously either for a certain duration,
191/// or until the provided [`Clock`] reaches a certain timestamp.
192pub trait LocalTimer {
193 /// Returns a future that gets fulfilled after the given `Duration`
194 fn delay(&self, delay: Duration) -> LocalTimerFuture;
195
196 /// Returns a future that gets fulfilled when the utilized [`Clock`] reaches
197 /// the given timestamp.
198 fn deadline(&self, timestamp: u64) -> LocalTimerFuture;
199}
200
201/// An asynchronously awaitable thread-safe timer.
202///
203/// The timer operates on millisecond precision and makes use of a configurable
204/// clock source.
205///
206/// The timer allows to wait asynchronously either for a certain duration,
207/// or until the provided [`Clock`] reaches a certain timestamp.
208pub trait Timer {
209 /// Returns a future that gets fulfilled after the given `Duration`
210 fn delay(&self, delay: Duration) -> TimerFuture;
211
212 /// Returns a future that gets fulfilled when the utilized [`Clock`] reaches
213 /// the given timestamp.
214 fn deadline(&self, timestamp: u64) -> TimerFuture;
215}
216
217/// An asynchronously awaitable timer.
218///
219/// The timer operates on millisecond precision and makes use of a configurable
220/// clock source.
221///
222/// The timer allows to wait asynchronously either for a certain duration,
223/// or until the provided [`Clock`] reaches a certain timestamp.
224///
225/// In order to unblock tasks that are waiting on the timer,
226/// [`check_expirations`](GenericTimerService::check_expirations)
227/// must be called in regular intervals on this timer service.
228///
229/// The timer can either be running on a separate timer thread (in case a
230/// thread-safe timer type is utilize), or it can be integrated into an executor
231/// in order to minimize context switches.
232pub struct GenericTimerService<MutexType: RawMutex> {
233 inner: Mutex<MutexType, TimerState>,
234}
235
236// The timer can be sent to other threads as long as it's not borrowed
237unsafe impl<MutexType: RawMutex + Send> Send
238 for GenericTimerService<MutexType>
239{
240}
241// The timer is thread-safe as long as it uses a thread-safe mutex
242unsafe impl<MutexType: RawMutex + Sync> Sync
243 for GenericTimerService<MutexType>
244{
245}
246
247impl<MutexType: RawMutex> core::fmt::Debug for GenericTimerService<MutexType> {
248 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
249 f.debug_struct("TimerService").finish()
250 }
251}
252
253impl<MutexType: RawMutex> GenericTimerService<MutexType> {
254 /// Creates a new Timer in the given state.
255 ///
256 /// The Timer will query the provided [`Clock`] instance for the current
257 /// time whenever required.
258 ///
259 /// In order to create a create a clock which utilizes system time,
260 /// [`StdClock`](super::StdClock) can be utilized.
261 /// In order to simulate time for test purposes,
262 /// [`MockClock`](super::MockClock) can be utilized.
263 pub fn new(clock: &'static dyn Clock) -> GenericTimerService<MutexType> {
264 GenericTimerService::<MutexType> {
265 inner: Mutex::new(TimerState::new(clock)),
266 }
267 }
268
269 /// Returns a timestamp when the next timer expires.
270 ///
271 /// For thread-safe timers, the returned value is not precise and subject to
272 /// race-conditions, since other threads can add timer in the meantime.
273 ///
274 /// Therefore adding any timer to the [`GenericTimerService`] should also
275 /// make sure to wake up the executor which polls for timeouts, in order to
276 /// let it capture the latest change.
277 pub fn next_expiration(&self) -> Option<u64> {
278 self.inner.lock().next_expiration()
279 }
280
281 /// Checks whether any of the attached [`TimerFuture`]s has expired.
282 /// In this case the associated task is woken up.
283 pub fn check_expirations(&self) {
284 self.inner.lock().check_expirations()
285 }
286
287 /// Returns a deadline based on the current timestamp plus the given Duration
288 fn deadline_from_now(&self, duration: Duration) -> u64 {
289 let now = self.inner.lock().clock.now();
290 let duration_ms =
291 core::cmp::min(duration.as_millis(), core::u64::MAX as u128) as u64;
292 now.saturating_add(duration_ms)
293 }
294}
295
296impl<MutexType: RawMutex> LocalTimer for GenericTimerService<MutexType> {
297 /// Returns a future that gets fulfilled after the given [`Duration`]
298 fn delay(&self, delay: Duration) -> LocalTimerFuture {
299 let deadline = self.deadline_from_now(delay);
300 LocalTimer::deadline(&*self, deadline)
301 }
302
303 /// Returns a future that gets fulfilled when the utilized [`Clock`] reaches
304 /// the given timestamp.
305 fn deadline(&self, timestamp: u64) -> LocalTimerFuture {
306 LocalTimerFuture {
307 timer: Some(self),
308 wait_node: HeapNode::new(TimerQueueEntry::new(timestamp)),
309 }
310 }
311}
312
313impl<MutexType: RawMutex> Timer for GenericTimerService<MutexType>
314where
315 MutexType: Sync,
316{
317 /// Returns a future that gets fulfilled after the given [`Duration`]
318 fn delay(&self, delay: Duration) -> TimerFuture {
319 let deadline = self.deadline_from_now(delay);
320 Timer::deadline(&*self, deadline)
321 }
322
323 /// Returns a future that gets fulfilled when the utilized [`Clock`] reaches
324 /// the given timestamp.
325 fn deadline(&self, timestamp: u64) -> TimerFuture {
326 TimerFuture {
327 timer_future: LocalTimerFuture {
328 timer: Some(self),
329 wait_node: HeapNode::new(TimerQueueEntry::new(timestamp)),
330 },
331 }
332 }
333}
334
335impl<MutexType: RawMutex> TimerAccess for GenericTimerService<MutexType> {
336 unsafe fn try_wait(
337 &self,
338 wait_node: &mut HeapNode<TimerQueueEntry>,
339 cx: &mut Context<'_>,
340 ) -> Poll<()> {
341 self.inner.lock().try_wait(wait_node, cx)
342 }
343
344 fn remove_waiter(&self, wait_node: &mut HeapNode<TimerQueueEntry>) {
345 self.inner.lock().remove_waiter(wait_node)
346 }
347}
348
349/// A Future that is resolved once the requested time has elapsed.
350#[must_use = "futures do nothing unless polled"]
351pub struct LocalTimerFuture<'a> {
352 /// The Timer that is associated with this TimerFuture
353 timer: Option<&'a dyn TimerAccess>,
354 /// Node for waiting on the timer
355 wait_node: HeapNode<TimerQueueEntry>,
356}
357
358impl<'a> core::fmt::Debug for LocalTimerFuture<'a> {
359 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
360 f.debug_struct("LocalTimerFuture").finish()
361 }
362}
363
364impl<'a> Future for LocalTimerFuture<'a> {
365 type Output = ();
366
367 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
368 // It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
369 // However this didn't seem to work for some borrow checker reasons
370
371 // Safety: The next operations are safe, because Pin promises us that
372 // the address of the wait queue entry inside TimerFuture is stable,
373 // and we don't move any fields inside the future until it gets dropped.
374 let mut_self: &mut LocalTimerFuture =
375 unsafe { Pin::get_unchecked_mut(self) };
376
377 let timer =
378 mut_self.timer.expect("polled TimerFuture after completion");
379
380 let poll_res = unsafe { timer.try_wait(&mut mut_self.wait_node, cx) };
381
382 if poll_res.is_ready() {
383 // A value was available
384 mut_self.timer = None;
385 }
386
387 poll_res
388 }
389}
390
391impl<'a> FusedFuture for LocalTimerFuture<'a> {
392 fn is_terminated(&self) -> bool {
393 self.timer.is_none()
394 }
395}
396
397impl<'a> Drop for LocalTimerFuture<'a> {
398 fn drop(&mut self) {
399 // If this TimerFuture has been polled and it was added to the
400 // wait queue at the timer, it must be removed before dropping.
401 // Otherwise the timer would access invalid memory.
402 if let Some(timer) = self.timer {
403 timer.remove_waiter(&mut self.wait_node);
404 }
405 }
406}
407
408/// A Future that is resolved once the requested time has elapsed.
409#[must_use = "futures do nothing unless polled"]
410pub struct TimerFuture<'a> {
411 /// The Timer that is associated with this TimerFuture
412 timer_future: LocalTimerFuture<'a>,
413}
414
415// Safety: TimerFutures are only returned by GenericTimerService instances which
416// are thread-safe (RawMutex: Sync).
417unsafe impl<'a> Send for TimerFuture<'a> {}
418
419impl<'a> core::fmt::Debug for TimerFuture<'a> {
420 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
421 f.debug_struct("TimerFuture").finish()
422 }
423}
424
425impl<'a> Future for TimerFuture<'a> {
426 type Output = ();
427
428 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
429 // Safety: TimerFuture is a pure wrapper around LocalTimerFuture.
430 // The inner value is never moved
431 let inner_pin = unsafe {
432 Pin::map_unchecked_mut(self, |fut| &mut fut.timer_future)
433 };
434 inner_pin.poll(cx)
435 }
436}
437
438impl<'a> FusedFuture for TimerFuture<'a> {
439 fn is_terminated(&self) -> bool {
440 self.timer_future.is_terminated()
441 }
442}
443
444// Export a non thread-safe version using NoopLock
445
446/// A [`GenericTimerService`] implementation which is not thread-safe.
447pub type LocalTimerService = GenericTimerService<NoopLock>;
448
449#[cfg(feature = "std")]
450mod if_std {
451 use super::*;
452
453 // Export a thread-safe version using parking_lot::RawMutex
454
455 /// A [`GenericTimerService`] implementation backed by [`parking_lot`].
456 pub type TimerService = GenericTimerService<parking_lot::RawMutex>;
457}
458
459#[cfg(feature = "std")]
460pub use self::if_std::*;