pingora_timeout/timer.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Lightweight timer for systems with high rate of operations with timeout
//! associated with them
//!
//! Users don't need to interact with this module.
//!
//! The idea is to bucket timers into finite time slots so that operations that
//! start and end quickly don't have to create their own timers all the time
//!
//! Benchmark:
//! - create 7.809622ms total, 78ns avg per iteration
//! - drop: 1.348552ms total, 13ns avg per iteration
//!
//! tokio timer:
//! - create 34.317439ms total, 343ns avg per iteration
//! - drop: 10.694154ms total, 106ns avg per iteration
use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use thread_local::ThreadLocal;
use tokio::sync::Notify;
const RESOLUTION_MS: u64 = 10;
const RESOLUTION_DURATION: Duration = Duration::from_millis(RESOLUTION_MS);
// round to the NEXT timestamp based on the resolution
#[inline]
fn round_to(raw: u128, resolution: u128) -> u128 {
raw - 1 + resolution - (raw - 1) % resolution
}
// millisecond resolution as most
#[derive(PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Debug)]
struct Time(u128);
impl From<u128> for Time {
fn from(raw_ms: u128) -> Self {
Time(round_to(raw_ms, RESOLUTION_MS as u128))
}
}
impl From<Duration> for Time {
fn from(d: Duration) -> Self {
Time(round_to(d.as_millis(), RESOLUTION_MS as u128))
}
}
impl Time {
pub fn not_after(&self, ts: u128) -> bool {
self.0 <= ts
}
}
/// the stub for waiting for a timer to be expired.
pub struct TimerStub(Arc<Notify>, Arc<AtomicBool>);
impl TimerStub {
/// Wait for the timer to expire.
pub async fn poll(self) {
if self.1.load(Ordering::SeqCst) {
return;
}
self.0.notified().await;
}
}
struct Timer(Arc<Notify>, Arc<AtomicBool>);
impl Timer {
pub fn new() -> Self {
Timer(Arc::new(Notify::new()), Arc::new(AtomicBool::new(false)))
}
pub fn fire(&self) {
self.1.store(true, Ordering::SeqCst);
self.0.notify_waiters();
}
pub fn subscribe(&self) -> TimerStub {
TimerStub(self.0.clone(), self.1.clone())
}
}
/// The object that holds all the timers registered to it.
pub struct TimerManager {
// each thread insert into its local timer tree to avoid lock contention
timers: ThreadLocal<RwLock<BTreeMap<Time, Timer>>>,
zero: Instant, // the reference zero point of Timestamp
// Start a new clock thread if this is -1 or staled. The clock thread should keep updating this
clock_watchdog: AtomicI64,
paused: AtomicBool,
}
// Consider the clock thread is dead after it fails to update the thread in DELAYS_SEC
const DELAYS_SEC: i64 = 2; // TODO: make sure this value is larger than RESOLUTION_DURATION
impl Default for TimerManager {
fn default() -> Self {
TimerManager {
timers: ThreadLocal::new(),
zero: Instant::now(),
clock_watchdog: AtomicI64::new(-DELAYS_SEC),
paused: AtomicBool::new(false),
}
}
}
impl TimerManager {
/// Create a new [TimerManager]
pub fn new() -> Self {
Self::default()
}
// This thread sleeps for a resolution time and then fires all the timers that are due to fire
pub(crate) fn clock_thread(&self) {
loop {
std::thread::sleep(RESOLUTION_DURATION);
let now = Instant::now() - self.zero;
self.clock_watchdog
.store(now.as_secs() as i64, Ordering::Relaxed);
if self.is_paused_for_fork() {
// just stop acquiring the locks, waiting for fork to happen
continue;
}
let now = now.as_millis();
// iterate through the timer tree for all threads
for thread_timer in self.timers.iter() {
let mut timers = thread_timer.write();
// Fire all timers until now
loop {
let key_to_remove = timers.iter().next().and_then(|(k, _)| {
if k.not_after(now) {
Some(*k)
} else {
None
}
});
if let Some(k) = key_to_remove {
let timer = timers.remove(&k);
// safe to unwrap, the key is from iter().next()
timer.unwrap().fire();
} else {
break;
}
}
// write lock drops here
}
}
}
// False if the clock is already started
// If true, the caller must start the clock thread next
pub(crate) fn should_i_start_clock(&self) -> bool {
let Err(prev) = self.is_clock_running() else {
return false;
};
let now = Instant::now().duration_since(self.zero).as_secs() as i64;
let res =
self.clock_watchdog
.compare_exchange(prev, now, Ordering::SeqCst, Ordering::SeqCst);
res.is_ok()
}
// Ok(()) if clock is running (watch dog is within DELAYS_SEC of now)
// Err(time) if watch do stopped at `time`
pub(crate) fn is_clock_running(&self) -> Result<(), i64> {
let now = Instant::now().duration_since(self.zero).as_secs() as i64;
let prev = self.clock_watchdog.load(Ordering::SeqCst);
if now < prev + DELAYS_SEC {
Ok(())
} else {
Err(prev)
}
}
/// Register a timer.
///
/// When the timer expires, the [TimerStub] will be notified.
pub fn register_timer(&self, duration: Duration) -> TimerStub {
if self.is_paused_for_fork() {
// Return a dummy TimerStub that will trigger right away.
// This is fine assuming pause_for_fork() is called right before fork().
// The only possible register_timer() is from another thread which will
// be entirely lost after fork()
// TODO: buffer these register calls instead (without a lock)
let timer = Timer::new();
timer.fire();
return timer.subscribe();
}
let now: Time = (Instant::now() + duration - self.zero).into();
{
let timers = self.timers.get_or(|| RwLock::new(BTreeMap::new())).read();
if let Some(t) = timers.get(&now) {
return t.subscribe();
}
} // drop read lock
let timer = Timer::new();
let mut timers = self.timers.get_or(|| RwLock::new(BTreeMap::new())).write();
// Usually we check if another thread has insert the same node before we get the write lock,
// but because only this thread will insert anything to its local timers tree, there
// is no possible race that can happen. The only other thread is the clock thread who
// only removes timer from the tree
let stub = timer.subscribe();
timers.insert(now, timer);
stub
}
fn is_paused_for_fork(&self) -> bool {
self.paused.load(Ordering::SeqCst)
}
/// Pause the timer for fork()
///
/// Because RwLock across fork() is undefined behavior, this function makes sure that no one
/// holds any locks.
///
/// This function should be called right before fork().
pub fn pause_for_fork(&self) {
self.paused.store(true, Ordering::SeqCst);
// wait for everything to get out of their locks
std::thread::sleep(RESOLUTION_DURATION * 2);
}
/// Unpause the timer after fork()
///
/// This function should be called right after fork().
pub fn unpause(&self) {
self.paused.store(false, Ordering::SeqCst)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_round() {
assert_eq!(round_to(30, 10), 30);
assert_eq!(round_to(31, 10), 40);
assert_eq!(round_to(29, 10), 30);
}
#[test]
fn test_time() {
let t: Time = 128.into(); // t will round to 130
assert_eq!(t, Duration::from_millis(130).into());
assert!(!t.not_after(128));
assert!(!t.not_after(129));
assert!(t.not_after(130));
assert!(t.not_after(131));
}
#[tokio::test]
async fn test_timer_manager() {
let tm_a = Arc::new(TimerManager::new());
let tm = tm_a.clone();
std::thread::spawn(move || tm_a.clock_thread());
let now = Instant::now();
let t1 = tm.register_timer(Duration::from_secs(1));
let t2 = tm.register_timer(Duration::from_secs(1));
t1.poll().await;
assert_eq!(now.elapsed().as_secs(), 1);
let now = Instant::now();
t2.poll().await;
// t2 fired along t1 so no extra wait time
assert_eq!(now.elapsed().as_secs(), 0);
}
#[test]
fn test_timer_manager_start_check() {
let tm = Arc::new(TimerManager::new());
assert!(tm.should_i_start_clock());
assert!(!tm.should_i_start_clock());
assert!(tm.is_clock_running().is_ok());
}
#[test]
fn test_timer_manager_watchdog() {
let tm = Arc::new(TimerManager::new());
assert!(tm.should_i_start_clock());
assert!(!tm.should_i_start_clock());
// we don't actually start the clock thread, sleep for the watchdog to expire
std::thread::sleep(Duration::from_secs(DELAYS_SEC as u64 + 1));
assert!(tm.is_clock_running().is_err());
assert!(tm.should_i_start_clock());
}
#[tokio::test]
async fn test_timer_manager_pause() {
let tm_a = Arc::new(TimerManager::new());
let tm = tm_a.clone();
std::thread::spawn(move || tm_a.clock_thread());
let now = Instant::now();
let t1 = tm.register_timer(Duration::from_secs(2));
tm.pause_for_fork();
// no actual fork happen, we just test that pause and unpause work
// any timer in this critical section is timed out right away
let t2 = tm.register_timer(Duration::from_secs(2));
t2.poll().await;
assert_eq!(now.elapsed().as_secs(), 0);
std::thread::sleep(Duration::from_secs(1));
tm.unpause();
t1.poll().await;
assert_eq!(now.elapsed().as_secs(), 2);
}
}