slack_morphism/ratectl/
throttler.rs1use crate::models::SlackTeamId;
2use crate::ratectl::*;
3use std::collections::{BinaryHeap, HashMap};
4use std::time::{Duration, Instant};
5
6#[derive(Debug)]
7pub struct SlackRateThrottler {
8 pub config: SlackApiRateControlConfig,
9 global_max_rate_limit_counter: Option<ThrottlingCounter>,
10 global_all_team_special_limits: HashMap<SlackApiRateControlSpecialLimitKey, ThrottlingCounter>,
11 rate_limit_per_team: HashMap<SlackTeamId, SlackTeamLimits>,
12}
13
14impl SlackRateThrottler {
15 pub fn new(rate_control_config: SlackApiRateControlConfig) -> Self {
16 let global_max_rate_limit_counter = rate_control_config
17 .global_max_rate_limit
18 .clone()
19 .map(|rl| rl.to_throttling_counter());
20 Self {
21 config: rate_control_config,
22 global_max_rate_limit_counter,
23 global_all_team_special_limits: HashMap::new(),
24 rate_limit_per_team: HashMap::new(),
25 }
26 }
27
28 pub fn calc_throttle_delay(
29 &mut self,
30 method_rate_ctl: &SlackApiMethodRateControlConfig,
31 team_id: Option<SlackTeamId>,
32 min_delayed: Option<Duration>,
33 ) -> Option<Duration> {
34 let mut delays_heap: BinaryHeap<Duration> = BinaryHeap::new();
35 let now = Instant::now();
36
37 min_delayed
38 .iter()
39 .filter(|d| !d.is_zero())
40 .for_each(|d| delays_heap.push(*d));
41
42 self.global_max_rate_limit_counter
43 .as_ref()
44 .map(|c| c.update(now))
45 .into_iter()
46 .for_each(|updated_counter| {
47 if !updated_counter.delay().is_zero() {
48 delays_heap.push(*updated_counter.delay())
49 }
50 self.global_max_rate_limit_counter = Some(updated_counter);
51 });
52
53 match team_id {
54 Some(team_id) => {
55 let team_limits = self
56 .rate_limit_per_team
57 .entry(team_id)
58 .or_insert_with(|| SlackTeamLimits::new(&self.config));
59 team_limits.updated = now;
60
61 team_limits
62 .team_limit_counter
63 .as_ref()
64 .map(|c| c.update(now))
65 .into_iter()
66 .for_each(|updated_counter| {
67 if !updated_counter.delay().is_zero() {
68 delays_heap.push(*updated_counter.delay())
69 }
70 team_limits.team_limit_counter = Some(updated_counter);
71 });
72
73 if let Some(ref special_rate_limit) = method_rate_ctl.special_rate_limit {
74 let special_team_limit = team_limits
75 .special_limits
76 .entry(special_rate_limit.key.clone())
77 .or_insert_with(|| special_rate_limit.limit.to_throttling_counter());
78
79 *special_team_limit = special_team_limit.update(now);
80
81 if !special_team_limit.delay().is_zero() {
82 delays_heap.push(*special_team_limit.delay())
83 }
84 }
85
86 if let Some(ref tier) = method_rate_ctl.tier {
87 if let Some(tier_limit) = self.config.tiers_limits.get(tier) {
88 let tier_team_limit = team_limits
89 .tier_limits
90 .entry(tier.clone())
91 .or_insert_with(|| tier_limit.to_throttling_counter());
92
93 *tier_team_limit = tier_team_limit.update(now);
94
95 if !tier_team_limit.delay().is_zero() {
96 delays_heap.push(*tier_team_limit.delay())
97 }
98 }
99 }
100
101 self.rate_limit_per_team
103 .retain(|_, v| v.updated.duration_since(now).as_secs() < 3600);
104 }
105 None => {
106 if let Some(ref special_method_limits) = method_rate_ctl.special_rate_limit {
107 let special_team_limit = self
108 .global_all_team_special_limits
109 .entry(special_method_limits.key.clone())
110 .or_insert_with(|| special_method_limits.limit.to_throttling_counter());
111
112 *special_team_limit = special_team_limit.update(now);
113
114 if !special_team_limit.delay().is_zero() {
115 delays_heap.push(*special_team_limit.delay())
116 }
117 }
118 }
119 }
120
121 delays_heap.pop()
122 }
123}