broker_tokio/time/clock.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
//! Source of time abstraction.
//!
//! By default, `std::time::Instant::now()` is used. However, when the
//! `test-util` feature flag is enabled, the values returned for `now()` are
//! configurable.
cfg_not_test_util! {
use crate::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub(crate) struct Clock {}
pub(crate) fn now() -> Instant {
Instant::from_std(std::time::Instant::now())
}
impl Clock {
pub(crate) fn new() -> Clock {
Clock {}
}
pub(crate) fn now(&self) -> Instant {
now()
}
pub(crate) fn is_frozen(&self) -> bool {
false
}
pub(crate) fn advance(&self, _dur: Duration) {
unreachable!();
}
}
}
cfg_test_util! {
use crate::time::{Duration, Instant};
use std::sync::{Arc, Mutex};
use crate::runtime::context;
/// A handle to a source of time.
#[derive(Debug, Clone)]
pub(crate) struct Clock {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
/// Instant at which the clock was created
start: std::time::Instant,
/// Current, "frozen" time as an offset from `start`.
frozen: Mutex<Option<Duration>>,
}
/// Pause time
///
/// The current value of `Instant::now()` is saved and all subsequent calls
/// to `Instant::now()` will return the saved value. This is useful for
/// running tests that are dependent on time.
///
/// # Panics
///
/// Panics if time is already frozen or if called from outside of the Tokio
/// runtime.
pub fn pause() {
let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut frozen = clock.inner.frozen.lock().unwrap();
if frozen.is_some() {
panic!("time is already frozen");
}
*frozen = Some(clock.inner.start.elapsed());
}
/// Resume time
///
/// Clears the saved `Instant::now()` value. Subsequent calls to
/// `Instant::now()` will return the value returned by the system call.
///
/// # Panics
///
/// Panics if time is not frozen or if called from outside of the Tokio
/// runtime.
pub fn resume() {
let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut frozen = clock.inner.frozen.lock().unwrap();
if frozen.is_none() {
panic!("time is not frozen");
}
*frozen = None;
}
/// Advance time
///
/// Increments the saved `Instant::now()` value by `duration`. Subsequent
/// calls to `Instant::now()` will return the result of the increment.
///
/// # Panics
///
/// Panics if time is not frozen or if called from outside of the Tokio
/// runtime.
pub async fn advance(duration: Duration) {
let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
crate::task::yield_now().await;
}
/// Return the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
if let Some(clock) = context::clock() {
if let Some(frozen) = *clock.inner.frozen.lock().unwrap() {
Instant::from_std(clock.inner.start + frozen)
} else {
Instant::from_std(std::time::Instant::now())
}
} else {
Instant::from_std(std::time::Instant::now())
}
}
impl Clock {
/// Return a new `Clock` instance that uses the current execution context's
/// source of time.
pub(crate) fn new() -> Clock {
Clock {
inner: Arc::new(Inner {
start: std::time::Instant::now(),
frozen: Mutex::new(None),
}),
}
}
// TODO: delete this. Some tests rely on this
#[cfg(all(test, not(loom)))]
/// Return a new `Clock` instance that uses the current execution context's
/// source of time.
pub(crate) fn new_frozen() -> Clock {
Clock {
inner: Arc::new(Inner {
start: std::time::Instant::now(),
frozen: Mutex::new(Some(Duration::from_millis(0))),
}),
}
}
pub(crate) fn is_frozen(&self) -> bool {
self.inner.frozen.lock().unwrap().is_some()
}
pub(crate) fn advance(&self, duration: Duration) {
let mut frozen = self.inner.frozen.lock().unwrap();
if let Some(ref mut elapsed) = *frozen {
*elapsed += duration;
} else {
panic!("time is not frozen");
}
}
// TODO: delete this as well
#[cfg(all(test, not(loom)))]
pub(crate) fn advanced(&self) -> Duration {
self.inner.frozen.lock().unwrap().unwrap()
}
pub(crate) fn now(&self) -> Instant {
Instant::from_std(if let Some(frozen) = *self.inner.frozen.lock().unwrap() {
self.inner.start + frozen
} else {
std::time::Instant::now()
})
}
}
}