solana_perf/
data_budget.rs1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2
3#[derive(Default)]
4pub struct DataBudget {
5 bytes: AtomicUsize,
7 asof: AtomicU64,
10}
11
12impl DataBudget {
13 pub fn restricted() -> Self {
15 Self {
16 bytes: AtomicUsize::default(),
17 asof: AtomicU64::new(u64::MAX),
18 }
19 }
20
21 #[must_use]
24 pub fn take(&self, size: usize) -> bool {
25 let mut bytes = self.bytes.load(Ordering::Acquire);
26 loop {
27 bytes = match self.bytes.compare_exchange_weak(
28 bytes,
29 match bytes.checked_sub(size) {
30 None => return false,
31 Some(bytes) => bytes,
32 },
33 Ordering::AcqRel,
34 Ordering::Acquire,
35 ) {
36 Ok(_) => return true,
37 Err(bytes) => bytes,
38 }
39 }
40 }
41
42 fn can_update(&self, duration_millis: u64) -> bool {
45 let now = solana_sdk::timing::timestamp();
46 let mut asof = self.asof.load(Ordering::Acquire);
47 while asof.saturating_add(duration_millis) <= now {
48 asof = match self.asof.compare_exchange_weak(
49 asof,
50 now,
51 Ordering::AcqRel,
52 Ordering::Acquire,
53 ) {
54 Ok(_) => return true,
55 Err(asof) => asof,
56 }
57 }
58 false
59 }
60
61 pub fn update<F>(&self, duration_millis: u64, updater: F) -> usize
65 where
66 F: Fn(usize) -> usize,
67 {
68 if self.can_update(duration_millis) {
69 let mut bytes = self.bytes.load(Ordering::Acquire);
70 loop {
71 bytes = match self.bytes.compare_exchange_weak(
72 bytes,
73 updater(bytes),
74 Ordering::AcqRel,
75 Ordering::Acquire,
76 ) {
77 Ok(_) => break,
78 Err(bytes) => bytes,
79 }
80 }
81 }
82 self.bytes.load(Ordering::Acquire)
83 }
84
85 #[must_use]
86 pub fn check(&self, size: usize) -> bool {
87 size <= self.bytes.load(Ordering::Acquire)
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use {super::*, std::time::Duration};
94
95 #[test]
96 fn test_data_budget() {
97 let budget = DataBudget::default();
98 assert!(!budget.take(1)); assert_eq!(budget.update(1000, |bytes| bytes + 5), 5); assert!(budget.take(1));
102 assert!(budget.take(2));
103 assert!(!budget.take(3)); assert_eq!(budget.update(30, |_| 10), 2); assert!(!budget.take(3)); std::thread::sleep(Duration::from_millis(50));
109 assert_eq!(budget.update(30, |bytes| bytes * 2), 4); assert!(budget.take(3));
112 assert!(budget.take(1));
113 assert!(!budget.take(1)); }
115}