slack_morphism/ratectl/
throttling_counter.rs

1use std::ops::Add;
2use std::time::{Duration, Instant};
3
4#[derive(Clone, Debug)]
5pub struct ThrottlingCounter {
6    capacity: i64,
7    max_capacity: usize,
8    last_arrived: Instant,
9    last_updated: Instant,
10    rate_limit_in_millis: u64,
11    delay: Duration,
12}
13
14impl ThrottlingCounter {
15    pub fn new(max_capacity: usize, rate_limit_in_millis: u64) -> Self {
16        Self {
17            capacity: max_capacity as i64,
18            max_capacity,
19            last_arrived: Instant::now(),
20            last_updated: Instant::now(),
21            rate_limit_in_millis,
22            delay: Duration::from_millis(0),
23        }
24    }
25
26    pub fn update(&self, now: Instant) -> Self {
27        let time_elapsed_millis = now
28            .checked_duration_since(self.last_arrived)
29            .unwrap_or_else(|| Duration::from_millis(0))
30            .as_millis() as u64;
31
32        let (arrived, new_last_arrived) = {
33            if time_elapsed_millis >= self.rate_limit_in_millis {
34                let arrived_in_time = time_elapsed_millis / self.rate_limit_in_millis;
35                let new_last_updated = self.last_arrived.add(Duration::from_millis(
36                    arrived_in_time * self.rate_limit_in_millis,
37                ));
38                (arrived_in_time as usize, new_last_updated)
39            } else {
40                (0usize, self.last_arrived)
41            }
42        };
43
44        let new_available_capacity =
45            std::cmp::min(self.capacity + arrived as i64, self.max_capacity as i64);
46
47        if new_available_capacity > 0 {
48            Self {
49                capacity: new_available_capacity - 1,
50                last_arrived: new_last_arrived,
51                last_updated: now,
52                delay: Duration::from_millis(0),
53                ..self.clone()
54            }
55        } else {
56            let updated_capacity = new_available_capacity - 1;
57
58            let base_delay_in_millis = now
59                .checked_duration_since(self.last_updated)
60                .map_or(0u64, |d| d.as_millis() as u64);
61
62            let delay_penalty = (self.rate_limit_in_millis as f64 * self.capacity.abs() as f64
63                / self.max_capacity as f64) as u64;
64
65            let delay_in_millis = if base_delay_in_millis < self.rate_limit_in_millis {
66                self.rate_limit_in_millis - base_delay_in_millis
67            } else {
68                0
69            };
70            let delay_with_penalty = Duration::from_millis(delay_in_millis + delay_penalty);
71
72            Self {
73                capacity: updated_capacity,
74                last_arrived: new_last_arrived,
75                last_updated: now,
76                delay: delay_with_penalty,
77                ..self.clone()
78            }
79        }
80    }
81
82    pub fn delay(&self) -> &Duration {
83        &self.delay
84    }
85}
86
87#[test]
88fn check_decreased() {
89    use crate::ratectl::*;
90    let rate_limit = SlackApiRateControlLimit::new(15, Duration::from_secs(60));
91    let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms();
92    let rate_limit_capacity = rate_limit.to_rate_limit_capacity();
93
94    let now = Instant::now();
95    let counter = ThrottlingCounter::new(rate_limit_capacity, rate_limit_in_ms);
96    let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms - 1)));
97
98    assert_eq!(updated_counter.last_arrived, counter.last_arrived);
99    assert_eq!(updated_counter.delay, Duration::from_millis(0));
100    assert_eq!(updated_counter.capacity, counter.capacity - 1);
101}
102
103#[test]
104fn check_max_available() {
105    use crate::ratectl::*;
106    let rate_limit = SlackApiRateControlLimit::new(15, Duration::from_secs(60));
107    let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms();
108    let rate_limit_capacity = rate_limit.to_rate_limit_capacity();
109
110    let now = Instant::now();
111    let counter = ThrottlingCounter::new(rate_limit_capacity, rate_limit_in_ms);
112    let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms + 1)));
113
114    assert_eq!(updated_counter.delay, Duration::from_millis(0));
115    assert_eq!(updated_counter.capacity, (counter.max_capacity - 1) as i64);
116}
117
118#[test]
119fn check_delay() {
120    use crate::ratectl::*;
121    let counter =
122        SlackApiRateControlLimit::new(15, Duration::from_secs(60)).to_throttling_counter();
123
124    let now = Instant::now();
125
126    let updated_counter =
127        (0..counter.capacity + 1).fold(counter.clone(), |result, _| result.update(now));
128
129    assert_eq!(
130        updated_counter.delay,
131        Duration::from_millis(counter.rate_limit_in_millis)
132    );
133}