solana_perf/
data_budget.rs

1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2
3#[derive(Default)]
4pub struct DataBudget {
5    // Amount of bytes we have in the budget to send.
6    bytes: AtomicUsize,
7    // Last time that we upped the bytes count, used
8    // to detect when to up the bytes budget again
9    asof: AtomicU64,
10}
11
12impl DataBudget {
13    /// Create a data budget with max bytes, used for tests
14    pub fn restricted() -> Self {
15        Self {
16            bytes: AtomicUsize::default(),
17            asof: AtomicU64::new(u64::MAX),
18        }
19    }
20
21    // If there are enough bytes in the budget, consumes from
22    // the budget and returns true. Otherwise returns false.
23    #[must_use]
24    pub fn take(&self, size: usize) -> bool {
25        let mut bytes = self.bytes.load(Ordering::Acquire);
26        loop {
27            bytes = match self.bytes.compare_exchange_weak(
28                bytes,
29                match bytes.checked_sub(size) {
30                    None => return false,
31                    Some(bytes) => bytes,
32                },
33                Ordering::AcqRel,
34                Ordering::Acquire,
35            ) {
36                Ok(_) => return true,
37                Err(bytes) => bytes,
38            }
39        }
40    }
41
42    // Updates timestamp and returns true, if at least given milliseconds
43    // has passed since last update. Otherwise returns false.
44    fn can_update(&self, duration_millis: u64) -> bool {
45        let now = solana_time_utils::timestamp();
46        let mut asof = self.asof.load(Ordering::Acquire);
47        while asof.saturating_add(duration_millis) <= now {
48            asof = match self.asof.compare_exchange_weak(
49                asof,
50                now,
51                Ordering::AcqRel,
52                Ordering::Acquire,
53            ) {
54                Ok(_) => return true,
55                Err(asof) => asof,
56            }
57        }
58        false
59    }
60
61    /// Updates the budget if at least given milliseconds has passed since last
62    /// update. Updater function maps current value of bytes to the new one.
63    /// Returns current data-budget after the update.
64    pub fn update<F>(&self, duration_millis: u64, updater: F) -> usize
65    where
66        F: Fn(usize) -> usize,
67    {
68        if self.can_update(duration_millis) {
69            let mut bytes = self.bytes.load(Ordering::Acquire);
70            loop {
71                bytes = match self.bytes.compare_exchange_weak(
72                    bytes,
73                    updater(bytes),
74                    Ordering::AcqRel,
75                    Ordering::Acquire,
76                ) {
77                    Ok(_) => break,
78                    Err(bytes) => bytes,
79                }
80            }
81        }
82        self.bytes.load(Ordering::Acquire)
83    }
84
85    #[must_use]
86    pub fn check(&self, size: usize) -> bool {
87        size <= self.bytes.load(Ordering::Acquire)
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use {super::*, std::time::Duration};
94
95    #[test]
96    fn test_data_budget() {
97        let budget = DataBudget::default();
98        assert!(!budget.take(1)); // budget = 0.
99
100        assert_eq!(budget.update(1000, |bytes| bytes + 5), 5); // budget updates to 5.
101        assert!(budget.take(1));
102        assert!(budget.take(2));
103        assert!(!budget.take(3)); // budget = 2, out of budget.
104
105        assert_eq!(budget.update(30, |_| 10), 2); // no update, budget = 2.
106        assert!(!budget.take(3)); // budget = 2, out of budget.
107
108        std::thread::sleep(Duration::from_millis(50));
109        assert_eq!(budget.update(30, |bytes| bytes * 2), 4); // budget updates to 4.
110
111        assert!(budget.take(3));
112        assert!(budget.take(1));
113        assert!(!budget.take(1)); // budget = 0.
114    }
115}