kube_runtime/utils/
backoff_reset_timer.rs1use std::time::{Duration, Instant};
2
3pub trait Backoff: Iterator<Item = Duration> + Send + Sync + Unpin {
4 fn reset(&mut self);
6}
7
8impl<B: Backoff + ?Sized> Backoff for Box<B> {
9 fn reset(&mut self) {
10 let this: &mut B = self;
11 this.reset()
12 }
13}
14
15pub struct ResetTimerBackoff<B: Backoff> {
17 backoff: B,
18 last_backoff: Option<Instant>,
19 reset_duration: Duration,
20}
21
22impl<B: Backoff> ResetTimerBackoff<B> {
23 pub fn new(backoff: B, reset_duration: Duration) -> Self {
24 Self {
25 backoff,
26 last_backoff: None,
27 reset_duration,
28 }
29 }
30}
31
32impl<B: Backoff> Iterator for ResetTimerBackoff<B> {
33 type Item = Duration;
34
35 fn next(&mut self) -> Option<Duration> {
36 if let Some(last_backoff) = self.last_backoff {
37 if tokio::time::Instant::now().into_std() > last_backoff + self.reset_duration {
38 tracing::debug!(
39 ?last_backoff,
40 reset_duration = ?self.reset_duration,
41 "Resetting backoff, since reset duration has expired"
42 );
43 self.backoff.reset();
44 }
45 }
46 self.last_backoff = Some(tokio::time::Instant::now().into_std());
47 self.backoff.next()
48 }
49}
50
51impl<B: Backoff> Backoff for ResetTimerBackoff<B> {
52 fn reset(&mut self) {
53 self.backoff.reset();
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use tokio::time::advance;
60
61 use super::ResetTimerBackoff;
62 use crate::utils::stream_backoff::tests::LinearBackoff;
63 use std::time::Duration;
64
65 #[tokio::test]
66 async fn should_reset_when_timer_expires() {
67 tokio::time::pause();
68 let mut backoff = ResetTimerBackoff::new(
69 LinearBackoff::new(Duration::from_secs(2)),
70 Duration::from_secs(60),
71 );
72 assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
73 advance(Duration::from_secs(40)).await;
74 assert_eq!(backoff.next(), Some(Duration::from_secs(4)));
75 advance(Duration::from_secs(40)).await;
76 assert_eq!(backoff.next(), Some(Duration::from_secs(6)));
77 advance(Duration::from_secs(80)).await;
78 assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
79 advance(Duration::from_secs(80)).await;
80 assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
81 }
82}