slack_morphism/ratectl/
throttler.rs

1use crate::models::SlackTeamId;
2use crate::ratectl::*;
3use std::collections::{BinaryHeap, HashMap};
4use std::time::{Duration, Instant};
5
6#[derive(Debug)]
7pub struct SlackRateThrottler {
8    pub config: SlackApiRateControlConfig,
9    global_max_rate_limit_counter: Option<ThrottlingCounter>,
10    global_all_team_special_limits: HashMap<SlackApiRateControlSpecialLimitKey, ThrottlingCounter>,
11    rate_limit_per_team: HashMap<SlackTeamId, SlackTeamLimits>,
12}
13
14impl SlackRateThrottler {
15    pub fn new(rate_control_config: SlackApiRateControlConfig) -> Self {
16        let global_max_rate_limit_counter = rate_control_config
17            .global_max_rate_limit
18            .clone()
19            .map(|rl| rl.to_throttling_counter());
20        Self {
21            config: rate_control_config,
22            global_max_rate_limit_counter,
23            global_all_team_special_limits: HashMap::new(),
24            rate_limit_per_team: HashMap::new(),
25        }
26    }
27
28    pub fn calc_throttle_delay(
29        &mut self,
30        method_rate_ctl: &SlackApiMethodRateControlConfig,
31        team_id: Option<SlackTeamId>,
32        min_delayed: Option<Duration>,
33    ) -> Option<Duration> {
34        let mut delays_heap: BinaryHeap<Duration> = BinaryHeap::new();
35        let now = Instant::now();
36
37        min_delayed
38            .iter()
39            .filter(|d| !d.is_zero())
40            .for_each(|d| delays_heap.push(*d));
41
42        self.global_max_rate_limit_counter
43            .as_ref()
44            .map(|c| c.update(now))
45            .into_iter()
46            .for_each(|updated_counter| {
47                if !updated_counter.delay().is_zero() {
48                    delays_heap.push(*updated_counter.delay())
49                }
50                self.global_max_rate_limit_counter = Some(updated_counter);
51            });
52
53        match team_id {
54            Some(team_id) => {
55                let team_limits = self
56                    .rate_limit_per_team
57                    .entry(team_id)
58                    .or_insert_with(|| SlackTeamLimits::new(&self.config));
59                team_limits.updated = now;
60
61                team_limits
62                    .team_limit_counter
63                    .as_ref()
64                    .map(|c| c.update(now))
65                    .into_iter()
66                    .for_each(|updated_counter| {
67                        if !updated_counter.delay().is_zero() {
68                            delays_heap.push(*updated_counter.delay())
69                        }
70                        team_limits.team_limit_counter = Some(updated_counter);
71                    });
72
73                if let Some(ref special_rate_limit) = method_rate_ctl.special_rate_limit {
74                    let special_team_limit = team_limits
75                        .special_limits
76                        .entry(special_rate_limit.key.clone())
77                        .or_insert_with(|| special_rate_limit.limit.to_throttling_counter());
78
79                    *special_team_limit = special_team_limit.update(now);
80
81                    if !special_team_limit.delay().is_zero() {
82                        delays_heap.push(*special_team_limit.delay())
83                    }
84                }
85
86                if let Some(ref tier) = method_rate_ctl.tier {
87                    if let Some(tier_limit) = self.config.tiers_limits.get(tier) {
88                        let tier_team_limit = team_limits
89                            .tier_limits
90                            .entry(tier.clone())
91                            .or_insert_with(|| tier_limit.to_throttling_counter());
92
93                        *tier_team_limit = tier_team_limit.update(now);
94
95                        if !tier_team_limit.delay().is_zero() {
96                            delays_heap.push(*tier_team_limit.delay())
97                        }
98                    }
99                }
100
101                // Clean up old teams limits
102                self.rate_limit_per_team
103                    .retain(|_, v| v.updated.duration_since(now).as_secs() < 3600);
104            }
105            None => {
106                if let Some(ref special_method_limits) = method_rate_ctl.special_rate_limit {
107                    let special_team_limit = self
108                        .global_all_team_special_limits
109                        .entry(special_method_limits.key.clone())
110                        .or_insert_with(|| special_method_limits.limit.to_throttling_counter());
111
112                    *special_team_limit = special_team_limit.update(now);
113
114                    if !special_team_limit.delay().is_zero() {
115                        delays_heap.push(*special_team_limit.delay())
116                    }
117                }
118            }
119        }
120
121        delays_heap.pop()
122    }
123}