pingora_timeout/fast_timeout.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! The fast and more complicated version of pingora-timeout
//!
//! The following optimizations are applied:
//! - The timeouts lazily initialize their timer when the Future is pending for the first time.
//! - There is no global lock for creating and cancelling timeouts.
//! - Timeout timers are rounded to the next 10ms tick and timers are shared across all timeouts with the same deadline.
//!
//! In order for this to work, a standalone thread is created to arm the timers, which has some
//! overhead. As a general rule, the benefits of this don't outweigh the overhead unless
//! there are more than about 100 timeout() calls/sec in the system. Use regular tokio timeout or
//! [super::tokio_timeout] in the low usage case.
use super::timer::*;
use super::*;
use once_cell::sync::Lazy;
use std::sync::Arc;
static TIMER_MANAGER: Lazy<Arc<TimerManager>> = Lazy::new(|| {
let tm = Arc::new(TimerManager::new());
check_clock_thread(&tm);
tm
});
fn check_clock_thread(tm: &Arc<TimerManager>) {
if tm.should_i_start_clock() {
std::thread::Builder::new()
.name("Timer thread".into())
.spawn(|| TIMER_MANAGER.clock_thread())
.unwrap();
}
}
/// The timeout generated by [fast_timeout()].
///
/// Users don't need to interact with this object.
pub struct FastTimeout(Duration);
impl ToTimeout for FastTimeout {
fn timeout(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
Box::pin(TIMER_MANAGER.register_timer(self.0).poll())
}
fn create(d: Duration) -> Self {
FastTimeout(d)
}
}
/// Similar to [tokio::time::timeout] but more efficient.
pub fn fast_timeout<T>(duration: Duration, future: T) -> Timeout<T, FastTimeout>
where
T: Future,
{
check_clock_thread(&TIMER_MANAGER);
Timeout::new_with_delay(future, duration)
}
/// Similar to [tokio::time::sleep] but more efficient.
pub async fn fast_sleep(duration: Duration) {
check_clock_thread(&TIMER_MANAGER);
TIMER_MANAGER.register_timer(duration).poll().await
}
/// Pause the timer for fork()
///
/// Because RwLock across fork() is undefined behavior, this function makes sure that no one
/// holds any locks.
///
/// This function should be called right before fork().
pub fn pause_for_fork() {
TIMER_MANAGER.pause_for_fork();
}
/// Unpause the timer after fork()
///
/// This function should be called right after fork().
pub fn unpause() {
TIMER_MANAGER.unpause();
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_timeout() {
let fut = tokio_sleep(Duration::from_secs(1000));
let to = fast_timeout(Duration::from_secs(1), fut);
assert!(to.await.is_err())
}
#[tokio::test]
async fn test_instantly_return() {
let fut = async { 1 };
let to = fast_timeout(Duration::from_secs(1), fut);
assert_eq!(to.await.unwrap(), 1)
}
#[tokio::test]
async fn test_delayed_return() {
let fut = async {
tokio_sleep(Duration::from_secs(1)).await;
1
};
let to = fast_timeout(Duration::from_secs(1000), fut);
assert_eq!(to.await.unwrap(), 1)
}
#[tokio::test]
async fn test_sleep() {
let fut = async {
fast_sleep(Duration::from_secs(1)).await;
1
};
let to = fast_timeout(Duration::from_secs(1000), fut);
assert_eq!(to.await.unwrap(), 1)
}
}