mio_extras/
timer.rs

1//! Timer optimized for I/O related operations
2use crate::convert;
3use lazycell::LazyCell;
4use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
5use slab::Slab;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use std::{cmp, fmt, io, iter, thread, u64, usize};
10
11/// A timer.
12///
13/// Typical usage goes like this:
14///
15/// * register the timer with a `mio::Poll`.
16/// * set a timeout, by calling `Timer::set_timeout`.  Here you provide some
17///   state to be associated with this timeout.
18/// * poll the `Poll`, to learn when a timeout has occurred.
19/// * retrieve state associated with the timeout by calling `Timer::poll`.
20///
21/// You can omit use of the `Poll` altogether, if you like, and just poll the
22/// `Timer` directly.
23pub struct Timer<T> {
24    // Size of each tick in milliseconds
25    tick_ms: u64,
26    // Slab of timeout entries
27    entries: Slab<Entry<T>>,
28    // Timeout wheel. Each tick, the timer will look at the next slot for
29    // timeouts that match the current tick.
30    wheel: Vec<WheelEntry>,
31    // Tick 0's time instant
32    start: Instant,
33    // The current tick
34    tick: Tick,
35    // The next entry to possibly timeout
36    next: Token,
37    // Masks the target tick to get the slot
38    mask: u64,
39    // Set on registration with Poll
40    inner: LazyCell<Inner>,
41}
42
43/// Used to create a `Timer`.
44pub struct Builder {
45    // Approximate duration of each tick
46    tick: Duration,
47    // Number of slots in the timer wheel
48    num_slots: usize,
49    // Max number of timeouts that can be in flight at a given time.
50    capacity: usize,
51}
52
53/// A timeout, as returned by `Timer::set_timeout`.
54///
55/// Use this as the argument to `Timer::cancel_timeout`, to cancel this timeout.
56#[derive(Clone, Debug)]
57pub struct Timeout {
58    // Reference into the timer entry slab
59    token: Token,
60    // Tick that it should match up with
61    tick: u64,
62}
63
64struct Inner {
65    registration: Registration,
66    set_readiness: SetReadiness,
67    wakeup_state: WakeupState,
68    wakeup_thread: thread::JoinHandle<()>,
69}
70
71impl Drop for Inner {
72    fn drop(&mut self) {
73        // 1. Set wakeup state to TERMINATE_THREAD
74        self.wakeup_state.store(TERMINATE_THREAD, Ordering::Release);
75        // 2. Wake him up
76        self.wakeup_thread.thread().unpark();
77    }
78}
79
80#[derive(Copy, Clone, Debug)]
81struct WheelEntry {
82    next_tick: Tick,
83    head: Token,
84}
85
86// Doubly linked list of timer entries. Allows for efficient insertion /
87// removal of timeouts.
88struct Entry<T> {
89    state: T,
90    links: EntryLinks,
91}
92
93#[derive(Copy, Clone)]
94struct EntryLinks {
95    tick: Tick,
96    prev: Token,
97    next: Token,
98}
99
100type Tick = u64;
101
102const TICK_MAX: Tick = u64::MAX;
103
104// Manages communication with wakeup thread
105type WakeupState = Arc<AtomicUsize>;
106
107const TERMINATE_THREAD: usize = 0;
108const EMPTY: Token = Token(usize::MAX);
109
110impl Builder {
111    /// Set the tick duration.  Default is 100ms.
112    pub fn tick_duration(mut self, duration: Duration) -> Builder {
113        self.tick = duration;
114        self
115    }
116
117    /// Set the number of slots.  Default is 256.
118    pub fn num_slots(mut self, num_slots: usize) -> Builder {
119        self.num_slots = num_slots;
120        self
121    }
122
123    /// Set the capacity.  Default is 65536.
124    pub fn capacity(mut self, capacity: usize) -> Builder {
125        self.capacity = capacity;
126        self
127    }
128
129    /// Build a `Timer` with the parameters set on this `Builder`.
130    pub fn build<T>(self) -> Timer<T> {
131        Timer::new(
132            convert::millis(self.tick),
133            self.num_slots,
134            self.capacity,
135            Instant::now(),
136        )
137    }
138}
139
140impl Default for Builder {
141    fn default() -> Builder {
142        Builder {
143            tick: Duration::from_millis(100),
144            num_slots: 1 << 8,
145            capacity: 1 << 16,
146        }
147    }
148}
149
150impl<T> Timer<T> {
151    fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
152        let num_slots = num_slots.next_power_of_two();
153        let capacity = capacity.next_power_of_two();
154        let mask = (num_slots as u64) - 1;
155        let wheel = iter::repeat(WheelEntry {
156            next_tick: TICK_MAX,
157            head: EMPTY,
158        })
159        .take(num_slots)
160        .collect();
161
162        Timer {
163            tick_ms,
164            entries: Slab::with_capacity(capacity),
165            wheel,
166            start,
167            tick: 0,
168            next: EMPTY,
169            mask,
170            inner: LazyCell::new(),
171        }
172    }
173
174    /// Set a timeout.
175    ///
176    /// When the timeout occurs, the given state becomes available via `poll`.
177    pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout {
178        let delay_from_start = self.start.elapsed() + delay_from_now;
179        self.set_timeout_at(delay_from_start, state)
180    }
181
182    fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout {
183        let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
184        trace!(
185            "setting timeout; delay={:?}; tick={:?}; current-tick={:?}",
186            delay_from_start,
187            tick,
188            self.tick
189        );
190
191        // Always target at least 1 tick in the future
192        if tick <= self.tick {
193            tick = self.tick + 1;
194        }
195
196        self.insert(tick, state)
197    }
198
199    fn insert(&mut self, tick: Tick, state: T) -> Timeout {
200        // Get the slot for the requested tick
201        let slot = (tick & self.mask) as usize;
202        let curr = self.wheel[slot];
203
204        // Insert the new entry
205        let entry = Entry::new(state, tick, curr.head);
206        let token = Token(self.entries.insert(entry));
207
208        if curr.head != EMPTY {
209            // If there was a previous entry, set its prev pointer to the new
210            // entry
211            self.entries[curr.head.into()].links.prev = token;
212        }
213
214        // Update the head slot
215        self.wheel[slot] = WheelEntry {
216            next_tick: cmp::min(tick, curr.next_tick),
217            head: token,
218        };
219
220        self.schedule_readiness(tick);
221
222        trace!("inserted timout; slot={}; token={:?}", slot, token);
223
224        // Return the new timeout
225        Timeout { token, tick }
226    }
227
228    /// Cancel a timeout.
229    ///
230    /// If the timeout has not yet occurred, the return value holds the
231    /// associated state.
232    pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
233        let links = match self.entries.get(timeout.token.into()) {
234            Some(e) => e.links,
235            None => return None,
236        };
237
238        // Sanity check
239        if links.tick != timeout.tick {
240            return None;
241        }
242
243        self.unlink(&links, timeout.token);
244        Some(self.entries.remove(timeout.token.into()).state)
245    }
246
247    /// Poll for an expired timer.
248    ///
249    /// The return value holds the state associated with the first expired
250    /// timer, if any.
251    pub fn poll(&mut self) -> Option<T> {
252        let target_tick = current_tick(self.start, self.tick_ms);
253        self.poll_to(target_tick)
254    }
255
256    fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
257        trace!(
258            "tick_to; target_tick={}; current_tick={}",
259            target_tick,
260            self.tick
261        );
262
263        if target_tick < self.tick {
264            target_tick = self.tick;
265        }
266
267        while self.tick <= target_tick {
268            let curr = self.next;
269
270            trace!("ticking; curr={:?}", curr);
271
272            if curr == EMPTY {
273                self.tick += 1;
274
275                let slot = self.slot_for(self.tick);
276                self.next = self.wheel[slot].head;
277
278                // Handle the case when a slot has a single timeout which gets
279                // canceled before the timeout expires. In this case, the
280                // slot's head is EMPTY but there is a value for next_tick. Not
281                // resetting next_tick here causes the timer to get stuck in a
282                // loop.
283                if self.next == EMPTY {
284                    self.wheel[slot].next_tick = TICK_MAX;
285                }
286            } else {
287                let slot = self.slot_for(self.tick);
288
289                if curr == self.wheel[slot].head {
290                    self.wheel[slot].next_tick = TICK_MAX;
291                }
292
293                let links = self.entries[curr.into()].links;
294
295                if links.tick <= self.tick {
296                    trace!("triggering; token={:?}", curr);
297
298                    // Unlink will also advance self.next
299                    self.unlink(&links, curr);
300
301                    // Remove and return the token
302                    return Some(self.entries.remove(curr.into()).state);
303                } else {
304                    let next_tick = self.wheel[slot].next_tick;
305                    self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
306                    self.next = links.next;
307                }
308            }
309        }
310
311        // No more timeouts to poll
312        if let Some(inner) = self.inner.borrow() {
313            trace!("unsetting readiness");
314            let _ = inner.set_readiness.set_readiness(Ready::empty());
315
316            if let Some(tick) = self.next_tick() {
317                self.schedule_readiness(tick);
318            }
319        }
320
321        None
322    }
323
324    fn unlink(&mut self, links: &EntryLinks, token: Token) {
325        trace!(
326            "unlinking timeout; slot={}; token={:?}",
327            self.slot_for(links.tick),
328            token
329        );
330
331        if links.prev == EMPTY {
332            let slot = self.slot_for(links.tick);
333            self.wheel[slot].head = links.next;
334        } else {
335            self.entries[links.prev.into()].links.next = links.next;
336        }
337
338        if links.next != EMPTY {
339            self.entries[links.next.into()].links.prev = links.prev;
340
341            if token == self.next {
342                self.next = links.next;
343            }
344        } else if token == self.next {
345            self.next = EMPTY;
346        }
347    }
348
349    fn schedule_readiness(&self, tick: Tick) {
350        if let Some(inner) = self.inner.borrow() {
351            // Coordinate setting readiness w/ the wakeup thread
352            let mut curr = inner.wakeup_state.load(Ordering::Acquire);
353
354            loop {
355                if curr as Tick <= tick {
356                    // Nothing to do, wakeup is already scheduled
357                    return;
358                }
359
360                // Attempt to move the wakeup time forward
361                trace!("advancing the wakeup time; target={}; curr={}", tick, curr);
362                let actual =
363                    inner
364                        .wakeup_state
365                        .compare_and_swap(curr, tick as usize, Ordering::Release);
366
367                if actual == curr {
368                    // Signal to the wakeup thread that the wakeup time has
369                    // been changed.
370                    trace!("unparking wakeup thread");
371                    inner.wakeup_thread.thread().unpark();
372                    return;
373                }
374
375                curr = actual;
376            }
377        }
378    }
379
380    // Next tick containing a timeout
381    fn next_tick(&self) -> Option<Tick> {
382        if self.next != EMPTY {
383            let slot = self.slot_for(self.entries[self.next.into()].links.tick);
384
385            if self.wheel[slot].next_tick == self.tick {
386                // There is data ready right now
387                return Some(self.tick);
388            }
389        }
390
391        self.wheel.iter().map(|e| e.next_tick).min()
392    }
393
394    fn slot_for(&self, tick: Tick) -> usize {
395        (self.mask & tick) as usize
396    }
397}
398
399impl<T> Default for Timer<T> {
400    fn default() -> Timer<T> {
401        Builder::default().build()
402    }
403}
404
405impl<T> Evented for Timer<T> {
406    fn register(
407        &self,
408        poll: &Poll,
409        token: Token,
410        interest: Ready,
411        opts: PollOpt,
412    ) -> io::Result<()> {
413        if self.inner.borrow().is_some() {
414            return Err(io::Error::new(
415                io::ErrorKind::Other,
416                "timer already registered",
417            ));
418        }
419
420        let (registration, set_readiness) = Registration::new2();
421        poll.register(&registration, token, interest, opts)?;
422        let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX));
423        let thread_handle = spawn_wakeup_thread(
424            Arc::clone(&wakeup_state),
425            set_readiness.clone(),
426            self.start,
427            self.tick_ms,
428        );
429
430        self.inner
431            .fill(Inner {
432                registration,
433                set_readiness,
434                wakeup_state,
435                wakeup_thread: thread_handle,
436            })
437            .expect("timer already registered");
438
439        if let Some(next_tick) = self.next_tick() {
440            self.schedule_readiness(next_tick);
441        }
442
443        Ok(())
444    }
445
446    fn reregister(
447        &self,
448        poll: &Poll,
449        token: Token,
450        interest: Ready,
451        opts: PollOpt,
452    ) -> io::Result<()> {
453        match self.inner.borrow() {
454            Some(inner) => poll.reregister(&inner.registration, token, interest, opts),
455            None => Err(io::Error::new(
456                io::ErrorKind::Other,
457                "receiver not registered",
458            )),
459        }
460    }
461
462    fn deregister(&self, poll: &Poll) -> io::Result<()> {
463        match self.inner.borrow() {
464            Some(inner) => poll.deregister(&inner.registration),
465            None => Err(io::Error::new(
466                io::ErrorKind::Other,
467                "receiver not registered",
468            )),
469        }
470    }
471}
472
473impl fmt::Debug for Inner {
474    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
475        fmt.debug_struct("Inner")
476            .field("registration", &self.registration)
477            .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed))
478            .finish()
479    }
480}
481
482fn spawn_wakeup_thread(
483    state: WakeupState,
484    set_readiness: SetReadiness,
485    start: Instant,
486    tick_ms: u64,
487) -> thread::JoinHandle<()> {
488    thread::spawn(move || {
489        let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick;
490
491        loop {
492            if sleep_until_tick == TERMINATE_THREAD as Tick {
493                return;
494            }
495
496            let now_tick = current_tick(start, tick_ms);
497
498            trace!(
499                "wakeup thread: sleep_until_tick={:?}; now_tick={:?}",
500                sleep_until_tick,
501                now_tick
502            );
503
504            if now_tick < sleep_until_tick {
505                // Calling park_timeout with u64::MAX leads to undefined
506                // behavior in pthread, causing the park to return immediately
507                // and causing the thread to tightly spin. Instead of u64::MAX
508                // on large values, simply use a blocking park.
509                match tick_ms.checked_mul(sleep_until_tick - now_tick) {
510                    Some(sleep_duration) => {
511                        trace!(
512                            "sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}",
513                            tick_ms,
514                            now_tick,
515                            sleep_until_tick,
516                            sleep_duration
517                        );
518                        thread::park_timeout(Duration::from_millis(sleep_duration));
519                    }
520                    None => {
521                        trace!(
522                            "sleeping; tick_ms={}; now_tick={}; blocking sleep",
523                            tick_ms,
524                            now_tick
525                        );
526                        thread::park();
527                    }
528                }
529                sleep_until_tick = state.load(Ordering::Acquire) as Tick;
530            } else {
531                let actual =
532                    state.compare_and_swap(sleep_until_tick as usize, usize::MAX, Ordering::AcqRel)
533                        as Tick;
534
535                if actual == sleep_until_tick {
536                    trace!("setting readiness from wakeup thread");
537                    let _ = set_readiness.set_readiness(Ready::readable());
538                    sleep_until_tick = usize::MAX as Tick;
539                } else {
540                    sleep_until_tick = actual as Tick;
541                }
542            }
543        }
544    })
545}
546
547fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
548    // Calculate tick rounding up to the closest one
549    let elapsed_ms = convert::millis(elapsed);
550    elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
551}
552
553fn current_tick(start: Instant, tick_ms: u64) -> Tick {
554    duration_to_tick(start.elapsed(), tick_ms)
555}
556
557impl<T> Entry<T> {
558    fn new(state: T, tick: u64, next: Token) -> Entry<T> {
559        Entry {
560            state,
561            links: EntryLinks {
562                tick,
563                prev: EMPTY,
564                next,
565            },
566        }
567    }
568}
569
570#[cfg(test)]
571mod test {
572    use super::*;
573    use std::time::{Duration, Instant};
574
575    #[test]
576    pub fn test_timeout_next_tick() {
577        let mut t = timer();
578        let mut tick;
579
580        t.set_timeout_at(Duration::from_millis(100), "a");
581
582        tick = ms_to_tick(&t, 50);
583        assert_eq!(None, t.poll_to(tick));
584
585        tick = ms_to_tick(&t, 100);
586        assert_eq!(Some("a"), t.poll_to(tick));
587        assert_eq!(None, t.poll_to(tick));
588
589        tick = ms_to_tick(&t, 150);
590        assert_eq!(None, t.poll_to(tick));
591
592        tick = ms_to_tick(&t, 200);
593        assert_eq!(None, t.poll_to(tick));
594
595        assert_eq!(count(&t), 0);
596    }
597
598    #[test]
599    pub fn test_clearing_timeout() {
600        let mut t = timer();
601        let mut tick;
602
603        let to = t.set_timeout_at(Duration::from_millis(100), "a");
604        assert_eq!("a", t.cancel_timeout(&to).unwrap());
605
606        tick = ms_to_tick(&t, 100);
607        assert_eq!(None, t.poll_to(tick));
608
609        tick = ms_to_tick(&t, 200);
610        assert_eq!(None, t.poll_to(tick));
611
612        assert_eq!(count(&t), 0);
613    }
614
615    #[test]
616    pub fn test_multiple_timeouts_same_tick() {
617        let mut t = timer();
618        let mut tick;
619
620        t.set_timeout_at(Duration::from_millis(100), "a");
621        t.set_timeout_at(Duration::from_millis(100), "b");
622
623        let mut rcv = vec![];
624
625        tick = ms_to_tick(&t, 100);
626        rcv.push(t.poll_to(tick).unwrap());
627        rcv.push(t.poll_to(tick).unwrap());
628
629        assert_eq!(None, t.poll_to(tick));
630
631        rcv.sort();
632        assert!(rcv == ["a", "b"], "actual={:?}", rcv);
633
634        tick = ms_to_tick(&t, 200);
635        assert_eq!(None, t.poll_to(tick));
636
637        assert_eq!(count(&t), 0);
638    }
639
640    #[test]
641    pub fn test_multiple_timeouts_diff_tick() {
642        let mut t = timer();
643        let mut tick;
644
645        t.set_timeout_at(Duration::from_millis(110), "a");
646        t.set_timeout_at(Duration::from_millis(220), "b");
647        t.set_timeout_at(Duration::from_millis(230), "c");
648        t.set_timeout_at(Duration::from_millis(440), "d");
649        t.set_timeout_at(Duration::from_millis(560), "e");
650
651        tick = ms_to_tick(&t, 100);
652        assert_eq!(Some("a"), t.poll_to(tick));
653        assert_eq!(None, t.poll_to(tick));
654
655        tick = ms_to_tick(&t, 200);
656        assert_eq!(Some("c"), t.poll_to(tick));
657        assert_eq!(Some("b"), t.poll_to(tick));
658        assert_eq!(None, t.poll_to(tick));
659
660        tick = ms_to_tick(&t, 300);
661        assert_eq!(None, t.poll_to(tick));
662
663        tick = ms_to_tick(&t, 400);
664        assert_eq!(Some("d"), t.poll_to(tick));
665        assert_eq!(None, t.poll_to(tick));
666
667        tick = ms_to_tick(&t, 500);
668        assert_eq!(None, t.poll_to(tick));
669
670        tick = ms_to_tick(&t, 600);
671        assert_eq!(Some("e"), t.poll_to(tick));
672        assert_eq!(None, t.poll_to(tick));
673    }
674
675    #[test]
676    pub fn test_catching_up() {
677        let mut t = timer();
678
679        t.set_timeout_at(Duration::from_millis(110), "a");
680        t.set_timeout_at(Duration::from_millis(220), "b");
681        t.set_timeout_at(Duration::from_millis(230), "c");
682        t.set_timeout_at(Duration::from_millis(440), "d");
683
684        let tick = ms_to_tick(&t, 600);
685        assert_eq!(Some("a"), t.poll_to(tick));
686        assert_eq!(Some("c"), t.poll_to(tick));
687        assert_eq!(Some("b"), t.poll_to(tick));
688        assert_eq!(Some("d"), t.poll_to(tick));
689        assert_eq!(None, t.poll_to(tick));
690    }
691
692    #[test]
693    pub fn test_timeout_hash_collision() {
694        let mut t = timer();
695        let mut tick;
696
697        t.set_timeout_at(Duration::from_millis(100), "a");
698        t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b");
699
700        tick = ms_to_tick(&t, 100);
701        assert_eq!(Some("a"), t.poll_to(tick));
702        assert_eq!(1, count(&t));
703
704        tick = ms_to_tick(&t, 200);
705        assert_eq!(None, t.poll_to(tick));
706        assert_eq!(1, count(&t));
707
708        tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
709        assert_eq!(Some("b"), t.poll_to(tick));
710        assert_eq!(0, count(&t));
711    }
712
713    #[test]
714    pub fn test_clearing_timeout_between_triggers() {
715        let mut t = timer();
716        let mut tick;
717
718        let a = t.set_timeout_at(Duration::from_millis(100), "a");
719        let _ = t.set_timeout_at(Duration::from_millis(100), "b");
720        let _ = t.set_timeout_at(Duration::from_millis(200), "c");
721
722        tick = ms_to_tick(&t, 100);
723        assert_eq!(Some("b"), t.poll_to(tick));
724        assert_eq!(2, count(&t));
725
726        t.cancel_timeout(&a);
727        assert_eq!(1, count(&t));
728
729        assert_eq!(None, t.poll_to(tick));
730
731        tick = ms_to_tick(&t, 200);
732        assert_eq!(Some("c"), t.poll_to(tick));
733        assert_eq!(0, count(&t));
734    }
735
736    const TICK: u64 = 100;
737    const SLOTS: usize = 16;
738    const CAPACITY: usize = 32;
739
740    fn count<T>(timer: &Timer<T>) -> usize {
741        timer.entries.len()
742    }
743
744    fn timer() -> Timer<&'static str> {
745        Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
746    }
747
748    fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
749        ms / timer.tick_ms
750    }
751}