pingora_timeout/
fast_timeout.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! The fast and more complicated version of pingora-timeout
//!
//! The following optimizations are applied:
//! - The timeouts lazily initialize their timer when the Future is pending for the first time.
//! - There is no global lock for creating and cancelling timeouts.
//! - Timeout timers are rounded to the next 10ms tick and timers are shared across all timeouts with the same deadline.
//!
//! In order for this to work, a standalone thread is created to arm the timers, which has some
//! overhead. As a general rule, the benefits of this don't outweigh the overhead unless
//! there are more than about 100 timeout() calls/sec in the system. Use regular tokio timeout or
//! [super::tokio_timeout] in the low usage case.

use super::timer::*;
use super::*;
use once_cell::sync::Lazy;
use std::sync::Arc;

static TIMER_MANAGER: Lazy<Arc<TimerManager>> = Lazy::new(|| {
    let tm = Arc::new(TimerManager::new());
    check_clock_thread(&tm);
    tm
});

fn check_clock_thread(tm: &Arc<TimerManager>) {
    if tm.should_i_start_clock() {
        std::thread::Builder::new()
            .name("Timer thread".into())
            .spawn(|| TIMER_MANAGER.clock_thread())
            .unwrap();
    }
}

/// The timeout generated by [fast_timeout()].
///
/// Users don't need to interact with this object.
pub struct FastTimeout(Duration);

impl ToTimeout for FastTimeout {
    fn timeout(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
        Box::pin(TIMER_MANAGER.register_timer(self.0).poll())
    }

    fn create(d: Duration) -> Self {
        FastTimeout(d)
    }
}

/// Similar to [tokio::time::timeout] but more efficient.
pub fn fast_timeout<T>(duration: Duration, future: T) -> Timeout<T, FastTimeout>
where
    T: Future,
{
    check_clock_thread(&TIMER_MANAGER);
    Timeout::new_with_delay(future, duration)
}

/// Similar to [tokio::time::sleep] but more efficient.
pub async fn fast_sleep(duration: Duration) {
    check_clock_thread(&TIMER_MANAGER);
    TIMER_MANAGER.register_timer(duration).poll().await
}

/// Pause the timer for fork()
///
/// Because RwLock across fork() is undefined behavior, this function makes sure that no one
/// holds any locks.
///
/// This function should be called right before fork().
pub fn pause_for_fork() {
    TIMER_MANAGER.pause_for_fork();
}

/// Unpause the timer after fork()
///
/// This function should be called right after fork().
pub fn unpause() {
    TIMER_MANAGER.unpause();
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_timeout() {
        let fut = tokio_sleep(Duration::from_secs(1000));
        let to = fast_timeout(Duration::from_secs(1), fut);
        assert!(to.await.is_err())
    }

    #[tokio::test]
    async fn test_instantly_return() {
        let fut = async { 1 };
        let to = fast_timeout(Duration::from_secs(1), fut);
        assert_eq!(to.await.unwrap(), 1)
    }

    #[tokio::test]
    async fn test_delayed_return() {
        let fut = async {
            tokio_sleep(Duration::from_secs(1)).await;
            1
        };
        let to = fast_timeout(Duration::from_secs(1000), fut);
        assert_eq!(to.await.unwrap(), 1)
    }

    #[tokio::test]
    async fn test_sleep() {
        let fut = async {
            fast_sleep(Duration::from_secs(1)).await;
            1
        };
        let to = fast_timeout(Duration::from_secs(1000), fut);
        assert_eq!(to.await.unwrap(), 1)
    }
}