slack_morphism/ratectl/
throttling_counter.rs1use std::ops::Add;
2use std::time::{Duration, Instant};
3
4#[derive(Clone, Debug)]
5pub struct ThrottlingCounter {
6 capacity: i64,
7 max_capacity: usize,
8 last_arrived: Instant,
9 last_updated: Instant,
10 rate_limit_in_millis: u64,
11 delay: Duration,
12}
13
14impl ThrottlingCounter {
15 pub fn new(max_capacity: usize, rate_limit_in_millis: u64) -> Self {
16 Self {
17 capacity: max_capacity as i64,
18 max_capacity,
19 last_arrived: Instant::now(),
20 last_updated: Instant::now(),
21 rate_limit_in_millis,
22 delay: Duration::from_millis(0),
23 }
24 }
25
26 pub fn update(&self, now: Instant) -> Self {
27 let time_elapsed_millis = now
28 .checked_duration_since(self.last_arrived)
29 .unwrap_or_else(|| Duration::from_millis(0))
30 .as_millis() as u64;
31
32 let (arrived, new_last_arrived) = {
33 if time_elapsed_millis >= self.rate_limit_in_millis {
34 let arrived_in_time = time_elapsed_millis / self.rate_limit_in_millis;
35 let new_last_updated = self.last_arrived.add(Duration::from_millis(
36 arrived_in_time * self.rate_limit_in_millis,
37 ));
38 (arrived_in_time as usize, new_last_updated)
39 } else {
40 (0usize, self.last_arrived)
41 }
42 };
43
44 let new_available_capacity =
45 std::cmp::min(self.capacity + arrived as i64, self.max_capacity as i64);
46
47 if new_available_capacity > 0 {
48 Self {
49 capacity: new_available_capacity - 1,
50 last_arrived: new_last_arrived,
51 last_updated: now,
52 delay: Duration::from_millis(0),
53 ..self.clone()
54 }
55 } else {
56 let updated_capacity = new_available_capacity - 1;
57
58 let base_delay_in_millis = now
59 .checked_duration_since(self.last_updated)
60 .map_or(0u64, |d| d.as_millis() as u64);
61
62 let delay_penalty = (self.rate_limit_in_millis as f64 * self.capacity.abs() as f64
63 / self.max_capacity as f64) as u64;
64
65 let delay_in_millis = if base_delay_in_millis < self.rate_limit_in_millis {
66 self.rate_limit_in_millis - base_delay_in_millis
67 } else {
68 0
69 };
70 let delay_with_penalty = Duration::from_millis(delay_in_millis + delay_penalty);
71
72 Self {
73 capacity: updated_capacity,
74 last_arrived: new_last_arrived,
75 last_updated: now,
76 delay: delay_with_penalty,
77 ..self.clone()
78 }
79 }
80 }
81
82 pub fn delay(&self) -> &Duration {
83 &self.delay
84 }
85}
86
87#[test]
88fn check_decreased() {
89 use crate::ratectl::*;
90 let rate_limit = SlackApiRateControlLimit::new(15, Duration::from_secs(60));
91 let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms();
92 let rate_limit_capacity = rate_limit.to_rate_limit_capacity();
93
94 let now = Instant::now();
95 let counter = ThrottlingCounter::new(rate_limit_capacity, rate_limit_in_ms);
96 let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms - 1)));
97
98 assert_eq!(updated_counter.last_arrived, counter.last_arrived);
99 assert_eq!(updated_counter.delay, Duration::from_millis(0));
100 assert_eq!(updated_counter.capacity, counter.capacity - 1);
101}
102
103#[test]
104fn check_max_available() {
105 use crate::ratectl::*;
106 let rate_limit = SlackApiRateControlLimit::new(15, Duration::from_secs(60));
107 let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms();
108 let rate_limit_capacity = rate_limit.to_rate_limit_capacity();
109
110 let now = Instant::now();
111 let counter = ThrottlingCounter::new(rate_limit_capacity, rate_limit_in_ms);
112 let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms + 1)));
113
114 assert_eq!(updated_counter.delay, Duration::from_millis(0));
115 assert_eq!(updated_counter.capacity, (counter.max_capacity - 1) as i64);
116}
117
118#[test]
119fn check_delay() {
120 use crate::ratectl::*;
121 let counter =
122 SlackApiRateControlLimit::new(15, Duration::from_secs(60)).to_throttling_counter();
123
124 let now = Instant::now();
125
126 let updated_counter =
127 (0..counter.capacity + 1).fold(counter.clone(), |result, _| result.update(now));
128
129 assert_eq!(
130 updated_counter.delay,
131 Duration::from_millis(counter.rate_limit_in_millis)
132 );
133}