kube_runtime/utils/
backoff_reset_timer.rs

1use std::time::{Duration, Instant};
2
3pub trait Backoff: Iterator<Item = Duration> + Send + Sync + Unpin {
4    /// Resets the internal state to the initial value.
5    fn reset(&mut self);
6}
7
8impl<B: Backoff + ?Sized> Backoff for Box<B> {
9    fn reset(&mut self) {
10        let this: &mut B = self;
11        this.reset()
12    }
13}
14
15/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
16pub struct ResetTimerBackoff<B: Backoff> {
17    backoff: B,
18    last_backoff: Option<Instant>,
19    reset_duration: Duration,
20}
21
22impl<B: Backoff> ResetTimerBackoff<B> {
23    pub fn new(backoff: B, reset_duration: Duration) -> Self {
24        Self {
25            backoff,
26            last_backoff: None,
27            reset_duration,
28        }
29    }
30}
31
32impl<B: Backoff> Iterator for ResetTimerBackoff<B> {
33    type Item = Duration;
34
35    fn next(&mut self) -> Option<Duration> {
36        if let Some(last_backoff) = self.last_backoff {
37            if tokio::time::Instant::now().into_std() > last_backoff + self.reset_duration {
38                tracing::debug!(
39                    ?last_backoff,
40                    reset_duration = ?self.reset_duration,
41                    "Resetting backoff, since reset duration has expired"
42                );
43                self.backoff.reset();
44            }
45        }
46        self.last_backoff = Some(tokio::time::Instant::now().into_std());
47        self.backoff.next()
48    }
49}
50
51impl<B: Backoff> Backoff for ResetTimerBackoff<B> {
52    fn reset(&mut self) {
53        self.backoff.reset();
54    }
55}
56
57#[cfg(test)]
58mod tests {
59    use tokio::time::advance;
60
61    use super::ResetTimerBackoff;
62    use crate::utils::stream_backoff::tests::LinearBackoff;
63    use std::time::Duration;
64
65    #[tokio::test]
66    async fn should_reset_when_timer_expires() {
67        tokio::time::pause();
68        let mut backoff = ResetTimerBackoff::new(
69            LinearBackoff::new(Duration::from_secs(2)),
70            Duration::from_secs(60),
71        );
72        assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
73        advance(Duration::from_secs(40)).await;
74        assert_eq!(backoff.next(), Some(Duration::from_secs(4)));
75        advance(Duration::from_secs(40)).await;
76        assert_eq!(backoff.next(), Some(Duration::from_secs(6)));
77        advance(Duration::from_secs(80)).await;
78        assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
79        advance(Duration::from_secs(80)).await;
80        assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
81    }
82}