metrics_util/storage/
reservoir.rs

1//! An atomic sampling reservoir.
2
3use std::{
4    cell::UnsafeCell,
5    sync::{
6        atomic::{
7            AtomicBool, AtomicU64, AtomicUsize,
8            Ordering::{Acquire, Relaxed, Release},
9        },
10        Mutex,
11    },
12};
13
14use rand::{rngs::OsRng, Rng as _, SeedableRng as _};
15use rand_xoshiro::Xoshiro256StarStar;
16
17thread_local! {
18    static FAST_RNG: UnsafeCell<Xoshiro256StarStar> = {
19        UnsafeCell::new(Xoshiro256StarStar::from_rng(OsRng).unwrap())
20    };
21}
22
23fn fastrand(upper: usize) -> usize {
24    FAST_RNG.with(|rng| {
25        // SAFETY: We know it's safe to take a mutable reference since we're getting a pointer to a thread-local value,
26        // and the reference never outlives the closure executing on this thread.
27        let rng = unsafe { &mut *rng.get() };
28        rng.gen_range(0..upper)
29    })
30}
31
32struct Reservoir {
33    values: Box<[AtomicU64]>,
34    count: AtomicUsize,
35}
36
37impl Reservoir {
38    fn with_capacity(capacity: usize) -> Self {
39        let mut values = Vec::with_capacity(capacity);
40        for _ in 0..capacity {
41            values.push(AtomicU64::new(0));
42        }
43
44        Self { values: values.into_boxed_slice(), count: AtomicUsize::new(0) }
45    }
46
47    fn push(&self, value: f64) {
48        let idx = self.count.fetch_add(1, Relaxed);
49        if idx < self.values.len() {
50            self.values[idx].store(value.to_bits(), Relaxed);
51        } else {
52            let maybe_idx = fastrand(idx);
53            if maybe_idx < self.values.len() {
54                self.values[maybe_idx].store(value.to_bits(), Relaxed);
55            }
56        }
57    }
58
59    fn drain(&self) -> Drain<'_> {
60        let unsampled_len = self.count.load(Relaxed);
61        let len = if unsampled_len > self.values.len() { self.values.len() } else { unsampled_len };
62        Drain { reservoir: self, unsampled_len, len, idx: 0 }
63    }
64}
65
66/// A draining iterator over the samples in a reservoir.
67pub struct Drain<'a> {
68    reservoir: &'a Reservoir,
69    unsampled_len: usize,
70    len: usize,
71    idx: usize,
72}
73
74impl<'a> Drain<'a> {
75    /// Returns the sample rate of the reservoir that produced this iterator.
76    ///
77    /// The sample rate is the ratio of the number of samples pushed into the reservoir to the number of samples held in
78    /// the reservoir. When the reservoir has not been filled, the sample rate is 1.0. When more samples have been
79    /// pushed into the reservoir than its overall capacity, the sample rate is `size / count`, where `size` is the
80    /// reservoir's capacity and `count` is the number of samples pushed into the reservoir.
81    ///
82    /// For example, if the reservoir holds 1,000 samples, and 100,000 values were pushed into the reservoir, the sample
83    /// rate would be 0.01 (100,000 / 1,000).
84    pub fn sample_rate(&self) -> f64 {
85        if self.unsampled_len == self.len {
86            1.0
87        } else {
88            self.len as f64 / self.unsampled_len as f64
89        }
90    }
91}
92
93impl<'a> Iterator for Drain<'a> {
94    type Item = f64;
95
96    fn next(&mut self) -> Option<Self::Item> {
97        if self.idx < self.len {
98            let value = f64::from_bits(self.reservoir.values[self.idx].load(Relaxed));
99            self.idx += 1;
100            Some(value)
101        } else {
102            None
103        }
104    }
105}
106
107impl ExactSizeIterator for Drain<'_> {
108    fn len(&self) -> usize {
109        self.len - self.idx
110    }
111}
112
113impl<'a> Drop for Drain<'a> {
114    fn drop(&mut self) {
115        self.reservoir.count.store(0, Release);
116    }
117}
118
119/// An atomic sampling reservoir.
120///
121/// [Reservoir sampling][rs] is a technique used to produce a statistically representative sample of a data stream, in a
122/// fixed space, without knowing the length of the stream in advance. `AtomicSamplingReservoir` is a thread-safe version of a
123/// sampling reservoir, based on Vitter's ["Algorithm R"][vitter_paper].
124///
125/// Utilizes an A/B-based storage mechanism to avoid contention between producers and the consumer, and a fast,
126/// thread-local PRNG ([Xoshiro256**][xoshiro256starstar]) to limit the per-call sampling overhead.
127///
128/// [rs]: https://en.wikipedia.org/wiki/Reservoir_sampling
129/// [vitter_paper]: https://www.cs.umd.edu/~samir/498/vitter.pdf
130/// [xoshiro256starstar]: https://prng.di.unimi.it
131pub struct AtomicSamplingReservoir {
132    primary: Reservoir,
133    secondary: Reservoir,
134    use_primary: AtomicBool,
135    swap: Mutex<()>,
136}
137
138impl AtomicSamplingReservoir {
139    /// Creates a new `AtomicSamplingReservoir` that stores up to `size` samples.
140    pub fn new(size: usize) -> Self {
141        Self {
142            primary: Reservoir::with_capacity(size),
143            secondary: Reservoir::with_capacity(size),
144            use_primary: AtomicBool::new(true),
145            swap: Mutex::new(()),
146        }
147    }
148
149    /// Returns `true` if the reservoir is empty.
150    pub fn is_empty(&self) -> bool {
151        let use_primary = self.use_primary.load(Acquire);
152        if use_primary {
153            self.primary.count.load(Relaxed) == 0
154        } else {
155            self.secondary.count.load(Relaxed) == 0
156        }
157    }
158
159    /// Pushes a sample into the reservoir.
160    pub fn push(&self, value: f64) {
161        let use_primary = self.use_primary.load(Relaxed);
162        if use_primary {
163            self.primary.push(value);
164        } else {
165            self.secondary.push(value);
166        };
167    }
168
169    /// Consumes all samples in the reservoir, passing them to the provided closure.
170    ///
171    /// The underlying storage is swapped before the closure is called, and the previous storage is consumed.
172    pub fn consume<F>(&self, mut f: F)
173    where
174        F: FnMut(Drain<'_>),
175    {
176        let _guard = self.swap.lock().unwrap();
177
178        // Swap the active reservoir.
179        let use_primary = self.use_primary.load(Acquire);
180        self.use_primary.store(!use_primary, Release);
181
182        // Consume the previous reservoir.
183        let drain = if use_primary { self.primary.drain() } else { self.secondary.drain() };
184
185        f(drain);
186    }
187}