slack_morphism/ratectl/
throttling_counter.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::ops::Add;
use std::time::{Duration, Instant};

#[derive(Clone, Debug)]
pub struct ThrottlingCounter {
    capacity: i64,
    max_capacity: usize,
    last_arrived: Instant,
    last_updated: Instant,
    rate_limit_in_millis: u64,
    delay: Duration,
}

impl ThrottlingCounter {
    pub fn new(max_capacity: usize, rate_limit_in_millis: u64) -> Self {
        Self {
            capacity: max_capacity as i64,
            max_capacity,
            last_arrived: Instant::now(),
            last_updated: Instant::now(),
            rate_limit_in_millis,
            delay: Duration::from_millis(0),
        }
    }

    pub fn update(&self, now: Instant) -> Self {
        let time_elapsed_millis = now
            .checked_duration_since(self.last_arrived)
            .unwrap_or_else(|| Duration::from_millis(0))
            .as_millis() as u64;

        let (arrived, new_last_arrived) = {
            if time_elapsed_millis >= self.rate_limit_in_millis {
                let arrived_in_time = time_elapsed_millis / self.rate_limit_in_millis;
                let new_last_updated = self.last_arrived.add(Duration::from_millis(
                    arrived_in_time * self.rate_limit_in_millis,
                ));
                (arrived_in_time as usize, new_last_updated)
            } else {
                (0usize, self.last_arrived)
            }
        };

        let new_available_capacity =
            std::cmp::min(self.capacity + arrived as i64, self.max_capacity as i64);

        if new_available_capacity > 0 {
            Self {
                capacity: new_available_capacity - 1,
                last_arrived: new_last_arrived,
                last_updated: now,
                delay: Duration::from_millis(0),
                ..self.clone()
            }
        } else {
            let updated_capacity = new_available_capacity - 1;

            let base_delay_in_millis = now
                .checked_duration_since(self.last_updated)
                .map_or(0u64, |d| d.as_millis() as u64);

            let delay_penalty = (self.rate_limit_in_millis as f64 * self.capacity.abs() as f64
                / self.max_capacity as f64) as u64;

            let delay_in_millis = if base_delay_in_millis < self.rate_limit_in_millis {
                self.rate_limit_in_millis - base_delay_in_millis
            } else {
                0
            };
            let delay_with_penalty = Duration::from_millis(delay_in_millis + delay_penalty);

            Self {
                capacity: updated_capacity,
                last_arrived: new_last_arrived,
                last_updated: now,
                delay: delay_with_penalty,
                ..self.clone()
            }
        }
    }

    pub fn delay(&self) -> &Duration {
        &self.delay
    }
}

#[test]
fn check_decreased() {
    use crate::ratectl::*;
    let rate_limit = SlackApiRateControlLimit::new(15, Duration::from_secs(60));
    let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms();
    let rate_limit_capacity = rate_limit.to_rate_limit_capacity();

    let now = Instant::now();
    let counter = ThrottlingCounter::new(rate_limit_capacity, rate_limit_in_ms);
    let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms - 1)));

    assert_eq!(updated_counter.last_arrived, counter.last_arrived);
    assert_eq!(updated_counter.delay, Duration::from_millis(0));
    assert_eq!(updated_counter.capacity, counter.capacity - 1);
}

#[test]
fn check_max_available() {
    use crate::ratectl::*;
    let rate_limit = SlackApiRateControlLimit::new(15, Duration::from_secs(60));
    let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms();
    let rate_limit_capacity = rate_limit.to_rate_limit_capacity();

    let now = Instant::now();
    let counter = ThrottlingCounter::new(rate_limit_capacity, rate_limit_in_ms);
    let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms + 1)));

    assert_eq!(updated_counter.delay, Duration::from_millis(0));
    assert_eq!(updated_counter.capacity, (counter.max_capacity - 1) as i64);
}

#[test]
fn check_delay() {
    use crate::ratectl::*;
    let counter =
        SlackApiRateControlLimit::new(15, Duration::from_secs(60)).to_throttling_counter();

    let now = Instant::now();

    let updated_counter =
        (0..counter.capacity + 1).fold(counter.clone(), |result, _| result.update(now));

    assert_eq!(
        updated_counter.delay,
        Duration::from_millis(counter.rate_limit_in_millis)
    );
}