1use crate::convert;
3use lazycell::LazyCell;
4use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
5use slab::Slab;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use std::{cmp, fmt, io, iter, thread, u64, usize};
10
11pub struct Timer<T> {
24 tick_ms: u64,
26 entries: Slab<Entry<T>>,
28 wheel: Vec<WheelEntry>,
31 start: Instant,
33 tick: Tick,
35 next: Token,
37 mask: u64,
39 inner: LazyCell<Inner>,
41}
42
43pub struct Builder {
45 tick: Duration,
47 num_slots: usize,
49 capacity: usize,
51}
52
53#[derive(Clone, Debug)]
57pub struct Timeout {
58 token: Token,
60 tick: u64,
62}
63
64struct Inner {
65 registration: Registration,
66 set_readiness: SetReadiness,
67 wakeup_state: WakeupState,
68 wakeup_thread: thread::JoinHandle<()>,
69}
70
71impl Drop for Inner {
72 fn drop(&mut self) {
73 self.wakeup_state.store(TERMINATE_THREAD, Ordering::Release);
75 self.wakeup_thread.thread().unpark();
77 }
78}
79
80#[derive(Copy, Clone, Debug)]
81struct WheelEntry {
82 next_tick: Tick,
83 head: Token,
84}
85
86struct Entry<T> {
89 state: T,
90 links: EntryLinks,
91}
92
93#[derive(Copy, Clone)]
94struct EntryLinks {
95 tick: Tick,
96 prev: Token,
97 next: Token,
98}
99
100type Tick = u64;
101
102const TICK_MAX: Tick = u64::MAX;
103
104type WakeupState = Arc<AtomicUsize>;
106
107const TERMINATE_THREAD: usize = 0;
108const EMPTY: Token = Token(usize::MAX);
109
110impl Builder {
111 pub fn tick_duration(mut self, duration: Duration) -> Builder {
113 self.tick = duration;
114 self
115 }
116
117 pub fn num_slots(mut self, num_slots: usize) -> Builder {
119 self.num_slots = num_slots;
120 self
121 }
122
123 pub fn capacity(mut self, capacity: usize) -> Builder {
125 self.capacity = capacity;
126 self
127 }
128
129 pub fn build<T>(self) -> Timer<T> {
131 Timer::new(
132 convert::millis(self.tick),
133 self.num_slots,
134 self.capacity,
135 Instant::now(),
136 )
137 }
138}
139
140impl Default for Builder {
141 fn default() -> Builder {
142 Builder {
143 tick: Duration::from_millis(100),
144 num_slots: 1 << 8,
145 capacity: 1 << 16,
146 }
147 }
148}
149
150impl<T> Timer<T> {
151 fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
152 let num_slots = num_slots.next_power_of_two();
153 let capacity = capacity.next_power_of_two();
154 let mask = (num_slots as u64) - 1;
155 let wheel = iter::repeat(WheelEntry {
156 next_tick: TICK_MAX,
157 head: EMPTY,
158 })
159 .take(num_slots)
160 .collect();
161
162 Timer {
163 tick_ms,
164 entries: Slab::with_capacity(capacity),
165 wheel,
166 start,
167 tick: 0,
168 next: EMPTY,
169 mask,
170 inner: LazyCell::new(),
171 }
172 }
173
174 pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout {
178 let delay_from_start = self.start.elapsed() + delay_from_now;
179 self.set_timeout_at(delay_from_start, state)
180 }
181
182 fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout {
183 let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
184 trace!(
185 "setting timeout; delay={:?}; tick={:?}; current-tick={:?}",
186 delay_from_start,
187 tick,
188 self.tick
189 );
190
191 if tick <= self.tick {
193 tick = self.tick + 1;
194 }
195
196 self.insert(tick, state)
197 }
198
199 fn insert(&mut self, tick: Tick, state: T) -> Timeout {
200 let slot = (tick & self.mask) as usize;
202 let curr = self.wheel[slot];
203
204 let entry = Entry::new(state, tick, curr.head);
206 let token = Token(self.entries.insert(entry));
207
208 if curr.head != EMPTY {
209 self.entries[curr.head.into()].links.prev = token;
212 }
213
214 self.wheel[slot] = WheelEntry {
216 next_tick: cmp::min(tick, curr.next_tick),
217 head: token,
218 };
219
220 self.schedule_readiness(tick);
221
222 trace!("inserted timout; slot={}; token={:?}", slot, token);
223
224 Timeout { token, tick }
226 }
227
228 pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
233 let links = match self.entries.get(timeout.token.into()) {
234 Some(e) => e.links,
235 None => return None,
236 };
237
238 if links.tick != timeout.tick {
240 return None;
241 }
242
243 self.unlink(&links, timeout.token);
244 Some(self.entries.remove(timeout.token.into()).state)
245 }
246
247 pub fn poll(&mut self) -> Option<T> {
252 let target_tick = current_tick(self.start, self.tick_ms);
253 self.poll_to(target_tick)
254 }
255
256 fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
257 trace!(
258 "tick_to; target_tick={}; current_tick={}",
259 target_tick,
260 self.tick
261 );
262
263 if target_tick < self.tick {
264 target_tick = self.tick;
265 }
266
267 while self.tick <= target_tick {
268 let curr = self.next;
269
270 trace!("ticking; curr={:?}", curr);
271
272 if curr == EMPTY {
273 self.tick += 1;
274
275 let slot = self.slot_for(self.tick);
276 self.next = self.wheel[slot].head;
277
278 if self.next == EMPTY {
284 self.wheel[slot].next_tick = TICK_MAX;
285 }
286 } else {
287 let slot = self.slot_for(self.tick);
288
289 if curr == self.wheel[slot].head {
290 self.wheel[slot].next_tick = TICK_MAX;
291 }
292
293 let links = self.entries[curr.into()].links;
294
295 if links.tick <= self.tick {
296 trace!("triggering; token={:?}", curr);
297
298 self.unlink(&links, curr);
300
301 return Some(self.entries.remove(curr.into()).state);
303 } else {
304 let next_tick = self.wheel[slot].next_tick;
305 self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
306 self.next = links.next;
307 }
308 }
309 }
310
311 if let Some(inner) = self.inner.borrow() {
313 trace!("unsetting readiness");
314 let _ = inner.set_readiness.set_readiness(Ready::empty());
315
316 if let Some(tick) = self.next_tick() {
317 self.schedule_readiness(tick);
318 }
319 }
320
321 None
322 }
323
324 fn unlink(&mut self, links: &EntryLinks, token: Token) {
325 trace!(
326 "unlinking timeout; slot={}; token={:?}",
327 self.slot_for(links.tick),
328 token
329 );
330
331 if links.prev == EMPTY {
332 let slot = self.slot_for(links.tick);
333 self.wheel[slot].head = links.next;
334 } else {
335 self.entries[links.prev.into()].links.next = links.next;
336 }
337
338 if links.next != EMPTY {
339 self.entries[links.next.into()].links.prev = links.prev;
340
341 if token == self.next {
342 self.next = links.next;
343 }
344 } else if token == self.next {
345 self.next = EMPTY;
346 }
347 }
348
349 fn schedule_readiness(&self, tick: Tick) {
350 if let Some(inner) = self.inner.borrow() {
351 let mut curr = inner.wakeup_state.load(Ordering::Acquire);
353
354 loop {
355 if curr as Tick <= tick {
356 return;
358 }
359
360 trace!("advancing the wakeup time; target={}; curr={}", tick, curr);
362 let actual =
363 inner
364 .wakeup_state
365 .compare_and_swap(curr, tick as usize, Ordering::Release);
366
367 if actual == curr {
368 trace!("unparking wakeup thread");
371 inner.wakeup_thread.thread().unpark();
372 return;
373 }
374
375 curr = actual;
376 }
377 }
378 }
379
380 fn next_tick(&self) -> Option<Tick> {
382 if self.next != EMPTY {
383 let slot = self.slot_for(self.entries[self.next.into()].links.tick);
384
385 if self.wheel[slot].next_tick == self.tick {
386 return Some(self.tick);
388 }
389 }
390
391 self.wheel.iter().map(|e| e.next_tick).min()
392 }
393
394 fn slot_for(&self, tick: Tick) -> usize {
395 (self.mask & tick) as usize
396 }
397}
398
399impl<T> Default for Timer<T> {
400 fn default() -> Timer<T> {
401 Builder::default().build()
402 }
403}
404
405impl<T> Evented for Timer<T> {
406 fn register(
407 &self,
408 poll: &Poll,
409 token: Token,
410 interest: Ready,
411 opts: PollOpt,
412 ) -> io::Result<()> {
413 if self.inner.borrow().is_some() {
414 return Err(io::Error::new(
415 io::ErrorKind::Other,
416 "timer already registered",
417 ));
418 }
419
420 let (registration, set_readiness) = Registration::new2();
421 poll.register(®istration, token, interest, opts)?;
422 let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX));
423 let thread_handle = spawn_wakeup_thread(
424 Arc::clone(&wakeup_state),
425 set_readiness.clone(),
426 self.start,
427 self.tick_ms,
428 );
429
430 self.inner
431 .fill(Inner {
432 registration,
433 set_readiness,
434 wakeup_state,
435 wakeup_thread: thread_handle,
436 })
437 .expect("timer already registered");
438
439 if let Some(next_tick) = self.next_tick() {
440 self.schedule_readiness(next_tick);
441 }
442
443 Ok(())
444 }
445
446 fn reregister(
447 &self,
448 poll: &Poll,
449 token: Token,
450 interest: Ready,
451 opts: PollOpt,
452 ) -> io::Result<()> {
453 match self.inner.borrow() {
454 Some(inner) => poll.reregister(&inner.registration, token, interest, opts),
455 None => Err(io::Error::new(
456 io::ErrorKind::Other,
457 "receiver not registered",
458 )),
459 }
460 }
461
462 fn deregister(&self, poll: &Poll) -> io::Result<()> {
463 match self.inner.borrow() {
464 Some(inner) => poll.deregister(&inner.registration),
465 None => Err(io::Error::new(
466 io::ErrorKind::Other,
467 "receiver not registered",
468 )),
469 }
470 }
471}
472
473impl fmt::Debug for Inner {
474 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
475 fmt.debug_struct("Inner")
476 .field("registration", &self.registration)
477 .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed))
478 .finish()
479 }
480}
481
482fn spawn_wakeup_thread(
483 state: WakeupState,
484 set_readiness: SetReadiness,
485 start: Instant,
486 tick_ms: u64,
487) -> thread::JoinHandle<()> {
488 thread::spawn(move || {
489 let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick;
490
491 loop {
492 if sleep_until_tick == TERMINATE_THREAD as Tick {
493 return;
494 }
495
496 let now_tick = current_tick(start, tick_ms);
497
498 trace!(
499 "wakeup thread: sleep_until_tick={:?}; now_tick={:?}",
500 sleep_until_tick,
501 now_tick
502 );
503
504 if now_tick < sleep_until_tick {
505 match tick_ms.checked_mul(sleep_until_tick - now_tick) {
510 Some(sleep_duration) => {
511 trace!(
512 "sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}",
513 tick_ms,
514 now_tick,
515 sleep_until_tick,
516 sleep_duration
517 );
518 thread::park_timeout(Duration::from_millis(sleep_duration));
519 }
520 None => {
521 trace!(
522 "sleeping; tick_ms={}; now_tick={}; blocking sleep",
523 tick_ms,
524 now_tick
525 );
526 thread::park();
527 }
528 }
529 sleep_until_tick = state.load(Ordering::Acquire) as Tick;
530 } else {
531 let actual =
532 state.compare_and_swap(sleep_until_tick as usize, usize::MAX, Ordering::AcqRel)
533 as Tick;
534
535 if actual == sleep_until_tick {
536 trace!("setting readiness from wakeup thread");
537 let _ = set_readiness.set_readiness(Ready::readable());
538 sleep_until_tick = usize::MAX as Tick;
539 } else {
540 sleep_until_tick = actual as Tick;
541 }
542 }
543 }
544 })
545}
546
547fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
548 let elapsed_ms = convert::millis(elapsed);
550 elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
551}
552
553fn current_tick(start: Instant, tick_ms: u64) -> Tick {
554 duration_to_tick(start.elapsed(), tick_ms)
555}
556
557impl<T> Entry<T> {
558 fn new(state: T, tick: u64, next: Token) -> Entry<T> {
559 Entry {
560 state,
561 links: EntryLinks {
562 tick,
563 prev: EMPTY,
564 next,
565 },
566 }
567 }
568}
569
570#[cfg(test)]
571mod test {
572 use super::*;
573 use std::time::{Duration, Instant};
574
575 #[test]
576 pub fn test_timeout_next_tick() {
577 let mut t = timer();
578 let mut tick;
579
580 t.set_timeout_at(Duration::from_millis(100), "a");
581
582 tick = ms_to_tick(&t, 50);
583 assert_eq!(None, t.poll_to(tick));
584
585 tick = ms_to_tick(&t, 100);
586 assert_eq!(Some("a"), t.poll_to(tick));
587 assert_eq!(None, t.poll_to(tick));
588
589 tick = ms_to_tick(&t, 150);
590 assert_eq!(None, t.poll_to(tick));
591
592 tick = ms_to_tick(&t, 200);
593 assert_eq!(None, t.poll_to(tick));
594
595 assert_eq!(count(&t), 0);
596 }
597
598 #[test]
599 pub fn test_clearing_timeout() {
600 let mut t = timer();
601 let mut tick;
602
603 let to = t.set_timeout_at(Duration::from_millis(100), "a");
604 assert_eq!("a", t.cancel_timeout(&to).unwrap());
605
606 tick = ms_to_tick(&t, 100);
607 assert_eq!(None, t.poll_to(tick));
608
609 tick = ms_to_tick(&t, 200);
610 assert_eq!(None, t.poll_to(tick));
611
612 assert_eq!(count(&t), 0);
613 }
614
615 #[test]
616 pub fn test_multiple_timeouts_same_tick() {
617 let mut t = timer();
618 let mut tick;
619
620 t.set_timeout_at(Duration::from_millis(100), "a");
621 t.set_timeout_at(Duration::from_millis(100), "b");
622
623 let mut rcv = vec![];
624
625 tick = ms_to_tick(&t, 100);
626 rcv.push(t.poll_to(tick).unwrap());
627 rcv.push(t.poll_to(tick).unwrap());
628
629 assert_eq!(None, t.poll_to(tick));
630
631 rcv.sort();
632 assert!(rcv == ["a", "b"], "actual={:?}", rcv);
633
634 tick = ms_to_tick(&t, 200);
635 assert_eq!(None, t.poll_to(tick));
636
637 assert_eq!(count(&t), 0);
638 }
639
640 #[test]
641 pub fn test_multiple_timeouts_diff_tick() {
642 let mut t = timer();
643 let mut tick;
644
645 t.set_timeout_at(Duration::from_millis(110), "a");
646 t.set_timeout_at(Duration::from_millis(220), "b");
647 t.set_timeout_at(Duration::from_millis(230), "c");
648 t.set_timeout_at(Duration::from_millis(440), "d");
649 t.set_timeout_at(Duration::from_millis(560), "e");
650
651 tick = ms_to_tick(&t, 100);
652 assert_eq!(Some("a"), t.poll_to(tick));
653 assert_eq!(None, t.poll_to(tick));
654
655 tick = ms_to_tick(&t, 200);
656 assert_eq!(Some("c"), t.poll_to(tick));
657 assert_eq!(Some("b"), t.poll_to(tick));
658 assert_eq!(None, t.poll_to(tick));
659
660 tick = ms_to_tick(&t, 300);
661 assert_eq!(None, t.poll_to(tick));
662
663 tick = ms_to_tick(&t, 400);
664 assert_eq!(Some("d"), t.poll_to(tick));
665 assert_eq!(None, t.poll_to(tick));
666
667 tick = ms_to_tick(&t, 500);
668 assert_eq!(None, t.poll_to(tick));
669
670 tick = ms_to_tick(&t, 600);
671 assert_eq!(Some("e"), t.poll_to(tick));
672 assert_eq!(None, t.poll_to(tick));
673 }
674
675 #[test]
676 pub fn test_catching_up() {
677 let mut t = timer();
678
679 t.set_timeout_at(Duration::from_millis(110), "a");
680 t.set_timeout_at(Duration::from_millis(220), "b");
681 t.set_timeout_at(Duration::from_millis(230), "c");
682 t.set_timeout_at(Duration::from_millis(440), "d");
683
684 let tick = ms_to_tick(&t, 600);
685 assert_eq!(Some("a"), t.poll_to(tick));
686 assert_eq!(Some("c"), t.poll_to(tick));
687 assert_eq!(Some("b"), t.poll_to(tick));
688 assert_eq!(Some("d"), t.poll_to(tick));
689 assert_eq!(None, t.poll_to(tick));
690 }
691
692 #[test]
693 pub fn test_timeout_hash_collision() {
694 let mut t = timer();
695 let mut tick;
696
697 t.set_timeout_at(Duration::from_millis(100), "a");
698 t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b");
699
700 tick = ms_to_tick(&t, 100);
701 assert_eq!(Some("a"), t.poll_to(tick));
702 assert_eq!(1, count(&t));
703
704 tick = ms_to_tick(&t, 200);
705 assert_eq!(None, t.poll_to(tick));
706 assert_eq!(1, count(&t));
707
708 tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
709 assert_eq!(Some("b"), t.poll_to(tick));
710 assert_eq!(0, count(&t));
711 }
712
713 #[test]
714 pub fn test_clearing_timeout_between_triggers() {
715 let mut t = timer();
716 let mut tick;
717
718 let a = t.set_timeout_at(Duration::from_millis(100), "a");
719 let _ = t.set_timeout_at(Duration::from_millis(100), "b");
720 let _ = t.set_timeout_at(Duration::from_millis(200), "c");
721
722 tick = ms_to_tick(&t, 100);
723 assert_eq!(Some("b"), t.poll_to(tick));
724 assert_eq!(2, count(&t));
725
726 t.cancel_timeout(&a);
727 assert_eq!(1, count(&t));
728
729 assert_eq!(None, t.poll_to(tick));
730
731 tick = ms_to_tick(&t, 200);
732 assert_eq!(Some("c"), t.poll_to(tick));
733 assert_eq!(0, count(&t));
734 }
735
736 const TICK: u64 = 100;
737 const SLOTS: usize = 16;
738 const CAPACITY: usize = 32;
739
740 fn count<T>(timer: &Timer<T>) -> usize {
741 timer.entries.len()
742 }
743
744 fn timer() -> Timer<&'static str> {
745 Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
746 }
747
748 fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
749 ms / timer.tick_ms
750 }
751}