metrics_util/storage/
reservoir.rs1use std::{
4 cell::UnsafeCell,
5 sync::{
6 atomic::{
7 AtomicBool, AtomicU64, AtomicUsize,
8 Ordering::{Acquire, Relaxed, Release},
9 },
10 Mutex,
11 },
12};
13
14use rand::{rngs::OsRng, Rng as _, SeedableRng as _};
15use rand_xoshiro::Xoshiro256StarStar;
16
17thread_local! {
18 static FAST_RNG: UnsafeCell<Xoshiro256StarStar> = {
19 UnsafeCell::new(Xoshiro256StarStar::from_rng(OsRng).unwrap())
20 };
21}
22
23fn fastrand(upper: usize) -> usize {
24 FAST_RNG.with(|rng| {
25 let rng = unsafe { &mut *rng.get() };
28 rng.gen_range(0..upper)
29 })
30}
31
32struct Reservoir {
33 values: Box<[AtomicU64]>,
34 count: AtomicUsize,
35}
36
37impl Reservoir {
38 fn with_capacity(capacity: usize) -> Self {
39 let mut values = Vec::with_capacity(capacity);
40 for _ in 0..capacity {
41 values.push(AtomicU64::new(0));
42 }
43
44 Self { values: values.into_boxed_slice(), count: AtomicUsize::new(0) }
45 }
46
47 fn push(&self, value: f64) {
48 let idx = self.count.fetch_add(1, Relaxed);
49 if idx < self.values.len() {
50 self.values[idx].store(value.to_bits(), Relaxed);
51 } else {
52 let maybe_idx = fastrand(idx);
53 if maybe_idx < self.values.len() {
54 self.values[maybe_idx].store(value.to_bits(), Relaxed);
55 }
56 }
57 }
58
59 fn drain(&self) -> Drain<'_> {
60 let unsampled_len = self.count.load(Relaxed);
61 let len = if unsampled_len > self.values.len() { self.values.len() } else { unsampled_len };
62 Drain { reservoir: self, unsampled_len, len, idx: 0 }
63 }
64}
65
66pub struct Drain<'a> {
68 reservoir: &'a Reservoir,
69 unsampled_len: usize,
70 len: usize,
71 idx: usize,
72}
73
74impl<'a> Drain<'a> {
75 pub fn sample_rate(&self) -> f64 {
85 if self.unsampled_len == self.len {
86 1.0
87 } else {
88 self.len as f64 / self.unsampled_len as f64
89 }
90 }
91}
92
93impl<'a> Iterator for Drain<'a> {
94 type Item = f64;
95
96 fn next(&mut self) -> Option<Self::Item> {
97 if self.idx < self.len {
98 let value = f64::from_bits(self.reservoir.values[self.idx].load(Relaxed));
99 self.idx += 1;
100 Some(value)
101 } else {
102 None
103 }
104 }
105}
106
107impl ExactSizeIterator for Drain<'_> {
108 fn len(&self) -> usize {
109 self.len - self.idx
110 }
111}
112
113impl<'a> Drop for Drain<'a> {
114 fn drop(&mut self) {
115 self.reservoir.count.store(0, Release);
116 }
117}
118
119pub struct AtomicSamplingReservoir {
132 primary: Reservoir,
133 secondary: Reservoir,
134 use_primary: AtomicBool,
135 swap: Mutex<()>,
136}
137
138impl AtomicSamplingReservoir {
139 pub fn new(size: usize) -> Self {
141 Self {
142 primary: Reservoir::with_capacity(size),
143 secondary: Reservoir::with_capacity(size),
144 use_primary: AtomicBool::new(true),
145 swap: Mutex::new(()),
146 }
147 }
148
149 pub fn is_empty(&self) -> bool {
151 let use_primary = self.use_primary.load(Acquire);
152 if use_primary {
153 self.primary.count.load(Relaxed) == 0
154 } else {
155 self.secondary.count.load(Relaxed) == 0
156 }
157 }
158
159 pub fn push(&self, value: f64) {
161 let use_primary = self.use_primary.load(Relaxed);
162 if use_primary {
163 self.primary.push(value);
164 } else {
165 self.secondary.push(value);
166 };
167 }
168
169 pub fn consume<F>(&self, mut f: F)
173 where
174 F: FnMut(Drain<'_>),
175 {
176 let _guard = self.swap.lock().unwrap();
177
178 let use_primary = self.use_primary.load(Acquire);
180 self.use_primary.store(!use_primary, Release);
181
182 let drain = if use_primary { self.primary.drain() } else { self.secondary.drain() };
184
185 f(drain);
186 }
187}