tower_retry/
budget.rs

1//! A retry "budget" for allowing only a certain amount of retries over time.
2
3use std::{
4    fmt,
5    sync::{
6        atomic::{AtomicIsize, Ordering},
7        Mutex,
8    },
9    time::Duration,
10};
11use tokio::time::Instant;
12
13/// Represents a "budget" for retrying requests.
14///
15/// This is useful for limiting the amount of retries a service can perform
16/// over a period of time, or per a certain number of requests attempted.
17pub struct Budget {
18    bucket: Bucket,
19    deposit_amount: isize,
20    withdraw_amount: isize,
21}
22
23/// Indicates that it is not currently allowed to "withdraw" another retry
24/// from the [`Budget`](Budget).
25#[derive(Debug)]
26pub struct Overdrawn {
27    _inner: (),
28}
29
30#[derive(Debug)]
31struct Bucket {
32    generation: Mutex<Generation>,
33    /// Initial budget allowed for every second.
34    reserve: isize,
35    /// Slots of a the TTL divided evenly.
36    slots: Box<[AtomicIsize]>,
37    /// The amount of time represented by each slot.
38    window: Duration,
39    /// The changers for the current slot to be commited
40    /// after the slot expires.
41    writer: AtomicIsize,
42}
43
44#[derive(Debug)]
45struct Generation {
46    /// Slot index of the last generation.
47    index: usize,
48    /// The timestamp since the last generation expired.
49    time: Instant,
50}
51
52// ===== impl Budget =====
53
54impl Budget {
55    /// Create a `Budget` that allows for a certain percent of the total
56    /// requests to be retried.
57    ///
58    /// - The `ttl` is the duration of how long a single `deposit` should be
59    ///   considered. Must be between 1 and 60 seconds.
60    /// - The `min_per_sec` is the minimum rate of retries allowed to accomodate
61    ///   clients that have just started issuing requests, or clients that do
62    ///   not issue many requests per window.
63    /// - The `retry_percent` is the percentage of calls to `deposit` that can
64    ///   be retried. This is in addition to any retries allowed for via
65    ///   `min_per_sec`. Must be between 0 and 1000.
66    ///
67    ///   As an example, if `0.1` is used, then for every 10 calls to `deposit`,
68    ///   1 retry will be allowed. If `2.0` is used, then every `deposit`
69    ///   allows for 2 retries.
70    pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
71        // assertions taken from finagle
72        assert!(ttl >= Duration::from_secs(1));
73        assert!(ttl <= Duration::from_secs(60));
74        assert!(retry_percent >= 0.0);
75        assert!(retry_percent <= 1000.0);
76        assert!(min_per_sec < ::std::i32::MAX as u32);
77
78        let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
79            // If there is no percent, then you gain nothing from deposits.
80            // Withdrawals can only be made against the reserve, over time.
81            (0, 1)
82        } else if retry_percent <= 1.0 {
83            (1, (1.0 / retry_percent) as isize)
84        } else {
85            // Support for when retry_percent is between 1.0 and 1000.0,
86            // meaning for every deposit D, D*retry_percent withdrawals
87            // can be made.
88            (1000, (1000.0 / retry_percent) as isize)
89        };
90        let reserve = (min_per_sec as isize)
91            .saturating_mul(ttl.as_secs() as isize) // ttl is between 1 and 60 seconds
92            .saturating_mul(withdraw_amount);
93
94        // AtomicIsize isn't clone, so the slots need to be built in a loop...
95        let windows = 10u32;
96        let mut slots = Vec::with_capacity(windows as usize);
97        for _ in 0..windows {
98            slots.push(AtomicIsize::new(0));
99        }
100
101        Budget {
102            bucket: Bucket {
103                generation: Mutex::new(Generation {
104                    index: 0,
105                    time: Instant::now(),
106                }),
107                reserve,
108                slots: slots.into_boxed_slice(),
109                window: ttl / windows,
110                writer: AtomicIsize::new(0),
111            },
112            deposit_amount,
113            withdraw_amount,
114        }
115    }
116
117    /// Store a "deposit" in the budget, which will be used to permit future
118    /// withdrawals.
119    pub fn deposit(&self) {
120        self.bucket.put(self.deposit_amount);
121    }
122
123    /// Check whether there is enough "balance" in the budget to issue a new
124    /// retry.
125    ///
126    /// If there is not enough, an `Err(Overdrawn)` is returned.
127    pub fn withdraw(&self) -> Result<(), Overdrawn> {
128        if self.bucket.try_get(self.withdraw_amount) {
129            Ok(())
130        } else {
131            Err(Overdrawn { _inner: () })
132        }
133    }
134}
135
136impl Default for Budget {
137    fn default() -> Budget {
138        Budget::new(Duration::from_secs(10), 10, 0.2)
139    }
140}
141
142impl fmt::Debug for Budget {
143    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144        f.debug_struct("Budget")
145            .field("deposit", &self.deposit_amount)
146            .field("withdraw", &self.withdraw_amount)
147            .field("balance", &self.bucket.sum())
148            .finish()
149    }
150}
151
152// ===== impl Bucket =====
153
154impl Bucket {
155    fn put(&self, amt: isize) {
156        self.expire();
157        self.writer.fetch_add(amt, Ordering::SeqCst);
158    }
159
160    fn try_get(&self, amt: isize) -> bool {
161        debug_assert!(amt >= 0);
162
163        self.expire();
164
165        let sum = self.sum();
166        if sum >= amt {
167            self.writer.fetch_add(-amt, Ordering::SeqCst);
168            true
169        } else {
170            false
171        }
172    }
173
174    fn expire(&self) {
175        let mut gen = self.generation.lock().expect("generation lock");
176
177        let now = Instant::now();
178        let diff = now - gen.time;
179        if diff < self.window {
180            // not expired yet
181            return;
182        }
183
184        let to_commit = self.writer.swap(0, Ordering::SeqCst);
185        self.slots[gen.index].store(to_commit, Ordering::SeqCst);
186
187        let mut diff = diff;
188        let mut idx = (gen.index + 1) % self.slots.len();
189        while diff > self.window {
190            self.slots[idx].store(0, Ordering::SeqCst);
191            diff -= self.window;
192            idx = (idx + 1) % self.slots.len();
193        }
194
195        gen.index = idx;
196        gen.time = now;
197    }
198
199    fn sum(&self) -> isize {
200        let current = self.writer.load(Ordering::SeqCst);
201        let windowed_sum: isize = self
202            .slots
203            .iter()
204            .map(|slot| slot.load(Ordering::SeqCst))
205            // fold() is used instead of sum() to determine overflow behavior
206            .fold(0, isize::saturating_add);
207
208        current
209            .saturating_add(windowed_sum)
210            .saturating_add(self.reserve)
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use tokio::time;
218
219    #[test]
220    fn empty() {
221        let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
222        bgt.withdraw().unwrap_err();
223    }
224
225    #[tokio::test]
226    async fn leaky() {
227        time::pause();
228
229        let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
230        bgt.deposit();
231
232        time::advance(Duration::from_secs(3)).await;
233
234        bgt.withdraw().unwrap_err();
235    }
236
237    #[tokio::test]
238    async fn slots() {
239        time::pause();
240
241        let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
242        bgt.deposit();
243        bgt.deposit();
244        time::advance(Duration::from_millis(901)).await;
245        // 900ms later, the deposit should still be valid
246        bgt.withdraw().unwrap();
247
248        // blank slate
249        time::advance(Duration::from_millis(2001)).await;
250
251        bgt.deposit();
252        time::advance(Duration::from_millis(301)).await;
253        bgt.deposit();
254        time::advance(Duration::from_millis(801)).await;
255        bgt.deposit();
256
257        // the first deposit is expired, but the 2nd should still be valid,
258        // combining with the 3rd
259        bgt.withdraw().unwrap();
260    }
261
262    #[tokio::test]
263    async fn reserve() {
264        let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
265        bgt.withdraw().unwrap();
266        bgt.withdraw().unwrap();
267        bgt.withdraw().unwrap();
268        bgt.withdraw().unwrap();
269        bgt.withdraw().unwrap();
270
271        bgt.withdraw().unwrap_err();
272    }
273}