metrics_util/registry/
mod.rs

1//! High-performance metrics storage.
2
3mod storage;
4use std::{
5    hash::BuildHasherDefault,
6    iter::repeat,
7    sync::{PoisonError, RwLock},
8};
9
10use hashbrown::{hash_map::RawEntryMut, HashMap};
11use metrics::{Key, KeyHasher};
12pub use storage::{AtomicStorage, Storage};
13
14#[cfg(feature = "recency")]
15mod recency;
16
17#[cfg(feature = "recency")]
18#[cfg_attr(docsrs, doc(cfg(feature = "recency")))]
19pub use recency::{
20    Generation, Generational, GenerationalAtomicStorage, GenerationalStorage, Recency,
21};
22
23use crate::Hashable;
24
25type RegistryHasher = KeyHasher;
26type RegistryHashMap<K, V> = HashMap<K, V, BuildHasherDefault<RegistryHasher>>;
27
28/// A high-performance metric registry.
29///
30/// `Registry` provides the ability to maintain a central listing of metrics mapped by a given key.
31/// Metrics themselves are stored in the objects returned by `S`.
32///
33/// ## Using `Registry` as the basis of an exporter
34///
35/// As a reusable building blocking for building exporter implementations, users should look at
36/// [`Key`] and [`AtomicStorage`] to use for their key and storage, respectively.
37///
38/// These two implementations provide behavior that is suitable for most exporters, providing
39/// seamless integration with the existing key type used by the core
40/// [`Recorder`][metrics::Recorder] trait, as well as atomic storage for metrics.
41///
42/// In some cases, users may prefer [`GenerationalAtomicStorage`] when know if a metric has been
43/// touched, even if its value has not changed since the last time it was observed, is necessary.
44///
45/// ## Performance
46///
47/// `Registry` is optimized for reads.
48#[derive(Debug)]
49pub struct Registry<K, S>
50where
51    S: Storage<K>,
52{
53    counters: Vec<RwLock<RegistryHashMap<K, S::Counter>>>,
54    gauges: Vec<RwLock<RegistryHashMap<K, S::Gauge>>>,
55    histograms: Vec<RwLock<RegistryHashMap<K, S::Histogram>>>,
56    shard_mask: usize,
57    storage: S,
58}
59
60fn shard_count() -> usize {
61    std::thread::available_parallelism().map(|x| x.get()).unwrap_or(1).next_power_of_two()
62}
63
64impl Registry<Key, AtomicStorage> {
65    /// Creates a new `Registry` using a regular [`Key`] and atomic storage.
66    pub fn atomic() -> Self {
67        let shard_count = shard_count();
68        let shard_mask = shard_count - 1;
69        let counters =
70            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
71        let gauges =
72            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
73        let histograms =
74            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
75
76        Self { counters, gauges, histograms, shard_mask, storage: AtomicStorage }
77    }
78}
79
80impl<K, S> Registry<K, S>
81where
82    S: Storage<K>,
83{
84    /// Creates a new `Registry`.
85    pub fn new(storage: S) -> Self {
86        let shard_count = shard_count();
87        let shard_mask = shard_count - 1;
88        let counters =
89            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
90        let gauges =
91            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
92        let histograms =
93            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
94
95        Self { counters, gauges, histograms, shard_mask, storage }
96    }
97
98    /// Removes all metrics from the registry.
99    ///
100    /// This operation is eventually consistent: metrics will be removed piecemeal, and this method
101    /// does not ensure that callers will see the registry as entirely empty at any given point.
102    pub fn clear(&self) {
103        for shard in &self.counters {
104            shard.write().unwrap_or_else(PoisonError::into_inner).clear();
105        }
106        for shard in &self.gauges {
107            shard.write().unwrap_or_else(PoisonError::into_inner).clear();
108        }
109        for shard in &self.histograms {
110            shard.write().unwrap_or_else(PoisonError::into_inner).clear();
111        }
112    }
113
114    /// Visits every counter stored in this registry.
115    ///
116    /// This operation does not lock the entire registry, but proceeds directly through the
117    /// "subshards" that are kept internally.  As a result, all subshards will be visited, but a
118    /// metric that existed at the exact moment that `visit_counters` was called may not actually be observed
119    /// if it is deleted before that subshard is reached.  Likewise, a metric that is added after
120    /// the call to `visit_counters`, but before `visit_counters` finishes, may also not be observed.
121    pub fn visit_counters<F>(&self, mut collect: F)
122    where
123        F: FnMut(&K, &S::Counter),
124    {
125        for subshard in self.counters.iter() {
126            let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
127            for (key, counter) in shard_read.iter() {
128                collect(key, counter);
129            }
130        }
131    }
132    /// Visits every gauge stored in this registry.
133    ///
134    /// This operation does not lock the entire registry, but proceeds directly through the
135    /// "subshards" that are kept internally.  As a result, all subshards will be visited, but a
136    /// metric that existed at the exact moment that `visit_gauges` was called may not actually be observed
137    /// if it is deleted before that subshard is reached.  Likewise, a metric that is added after
138    /// the call to `visit_gauges`, but before `visit_gauges` finishes, may also not be observed.
139    pub fn visit_gauges<F>(&self, mut collect: F)
140    where
141        F: FnMut(&K, &S::Gauge),
142    {
143        for subshard in self.gauges.iter() {
144            let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
145            for (key, gauge) in shard_read.iter() {
146                collect(key, gauge);
147            }
148        }
149    }
150
151    /// Visits every histogram stored in this registry.
152    ///
153    /// This operation does not lock the entire registry, but proceeds directly through the
154    /// "subshards" that are kept internally.  As a result, all subshards will be visited, but a
155    /// metric that existed at the exact moment that `visit_histograms` was called may not actually be observed
156    /// if it is deleted before that subshard is reached.  Likewise, a metric that is added after
157    /// the call to `visit_histograms`, but before `visit_histograms` finishes, may also not be observed.
158    pub fn visit_histograms<F>(&self, mut collect: F)
159    where
160        F: FnMut(&K, &S::Histogram),
161    {
162        for subshard in self.histograms.iter() {
163            let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
164            for (key, histogram) in shard_read.iter() {
165                collect(key, histogram);
166            }
167        }
168    }
169
170    /// Retains only counters specified by the predicate.
171    ///
172    /// Remove all counters for which f(&k, &c) returns false. This operation proceeds
173    /// through the "subshards" in the same way as `visit_counters`.
174    pub fn retain_counters<F>(&self, mut f: F)
175    where
176        F: FnMut(&K, &S::Counter) -> bool,
177    {
178        for subshard in self.counters.iter() {
179            let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
180            shard_write.retain(|k, c| f(k, c));
181        }
182    }
183
184    /// Retains only gauges specified by the predicate.
185    ///
186    /// Remove all gauges for which f(&k, &g) returns false. This operation proceeds
187    /// through the "subshards" in the same way as `visit_gauges`.
188    pub fn retain_gauges<F>(&self, mut f: F)
189    where
190        F: FnMut(&K, &S::Gauge) -> bool,
191    {
192        for subshard in self.gauges.iter() {
193            let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
194            shard_write.retain(|k, g| f(k, g));
195        }
196    }
197
198    /// Retains only histograms specified by the predicate.
199    ///
200    /// Remove all histograms for which f(&k, &h) returns false. This operation proceeds
201    /// through the "subshards" in the same way as `visit_histograms`.
202    pub fn retain_histograms<F>(&self, mut f: F)
203    where
204        F: FnMut(&K, &S::Histogram) -> bool,
205    {
206        for subshard in self.histograms.iter() {
207            let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
208            shard_write.retain(|k, h| f(k, h));
209        }
210    }
211}
212
213impl<K, S> Registry<K, S>
214where
215    S: Storage<K>,
216    K: Hashable,
217{
218    #[inline]
219    fn get_hash_and_shard_for_counter(
220        &self,
221        key: &K,
222    ) -> (u64, &RwLock<RegistryHashMap<K, S::Counter>>) {
223        let hash = key.hashable();
224
225        // SAFETY: We initialize vector of subshards with a power-of-two value, and
226        // `self.shard_mask` is `self.counters.len() - 1`, thus we can never have a result from the
227        // masking operation that results in a value which is not in bounds of our subshards vector.
228        let shard = unsafe { self.counters.get_unchecked(hash as usize & self.shard_mask) };
229
230        (hash, shard)
231    }
232
233    #[inline]
234    fn get_hash_and_shard_for_gauge(
235        &self,
236        key: &K,
237    ) -> (u64, &RwLock<RegistryHashMap<K, S::Gauge>>) {
238        let hash = key.hashable();
239
240        // SAFETY: We initialize the vector of subshards with a power-of-two value, and
241        // `self.shard_mask` is `self.gauges.len() - 1`, thus we can never have a result from the
242        // masking operation that results in a value which is not in bounds of our subshards vector.
243        let shard = unsafe { self.gauges.get_unchecked(hash as usize & self.shard_mask) };
244
245        (hash, shard)
246    }
247
248    #[inline]
249    fn get_hash_and_shard_for_histogram(
250        &self,
251        key: &K,
252    ) -> (u64, &RwLock<RegistryHashMap<K, S::Histogram>>) {
253        let hash = key.hashable();
254
255        // SAFETY: We initialize the vector of subshards with a power-of-two value, and
256        // `self.shard_mask` is `self.histograms.len() - 1`, thus we can never have a result from
257        // the masking operation that results in a value which is not in bounds of our subshards
258        // vector.
259        let shard = unsafe { self.histograms.get_unchecked(hash as usize & self.shard_mask) };
260
261        (hash, shard)
262    }
263}
264
265impl<K, S> Registry<K, S>
266where
267    S: Storage<K>,
268    K: Eq + Hashable,
269{
270    /// Deletes a counter from the registry.
271    ///
272    /// Returns `true` if the counter existed and was removed, `false` otherwise.
273    pub fn delete_counter(&self, key: &K) -> bool {
274        let (hash, shard) = self.get_hash_and_shard_for_counter(key);
275        let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
276        let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
277        if let RawEntryMut::Occupied(entry) = entry {
278            let _ = entry.remove_entry();
279            return true;
280        }
281
282        false
283    }
284
285    /// Deletes a gauge from the registry.
286    ///
287    /// Returns `true` if the gauge existed and was removed, `false` otherwise.
288    pub fn delete_gauge(&self, key: &K) -> bool {
289        let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
290        let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
291        let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
292        if let RawEntryMut::Occupied(entry) = entry {
293            let _ = entry.remove_entry();
294            return true;
295        }
296
297        false
298    }
299
300    /// Deletes a histogram from the registry.
301    ///
302    /// Returns `true` if the histogram existed and was removed, `false` otherwise.
303    pub fn delete_histogram(&self, key: &K) -> bool {
304        let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
305        let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
306        let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
307        if let RawEntryMut::Occupied(entry) = entry {
308            let _ = entry.remove_entry();
309            return true;
310        }
311
312        false
313    }
314
315    /// Gets a copy of an existing counter.
316    pub fn get_counter(&self, key: &K) -> Option<S::Counter> {
317        let (hash, shard) = self.get_hash_and_shard_for_counter(key);
318        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
319        shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
320    }
321
322    /// Gets a copy of an existing gauge.
323    pub fn get_gauge(&self, key: &K) -> Option<S::Gauge> {
324        let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
325        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
326        shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
327    }
328
329    /// Gets a copy of an existing histogram.
330    pub fn get_histogram(&self, key: &K) -> Option<S::Histogram> {
331        let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
332        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
333        shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
334    }
335}
336
337impl<K, S> Registry<K, S>
338where
339    S: Storage<K>,
340    K: Clone + Eq + Hashable,
341{
342    /// Gets or creates the given counter.
343    ///
344    /// The `op` function will be called for the counter under the given `key`, with the counter
345    /// first being created if it does not already exist.
346    pub fn get_or_create_counter<O, V>(&self, key: &K, op: O) -> V
347    where
348        O: FnOnce(&S::Counter) -> V,
349    {
350        let (hash, shard) = self.get_hash_and_shard_for_counter(key);
351
352        // Try and get the handle if it exists, running our operation if we succeed.
353        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
354        if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
355            op(v)
356        } else {
357            // Switch to write guard and insert the handle first.
358            drop(shard_read);
359            let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
360            let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
361            {
362                v
363            } else {
364                let (_, v) = shard_write
365                    .raw_entry_mut()
366                    .from_key_hashed_nocheck(hash, key)
367                    .or_insert_with(|| (key.clone(), self.storage.counter(key)));
368
369                v
370            };
371
372            op(v)
373        }
374    }
375
376    /// Gets or creates the given gauge.
377    ///
378    /// The `op` function will be called for the gauge under the given `key`, with the gauge
379    /// first being created if it does not already exist.
380    pub fn get_or_create_gauge<O, V>(&self, key: &K, op: O) -> V
381    where
382        O: FnOnce(&S::Gauge) -> V,
383    {
384        let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
385
386        // Try and get the handle if it exists, running our operation if we succeed.
387        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
388        if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
389            op(v)
390        } else {
391            // Switch to write guard and insert the handle first.
392            drop(shard_read);
393            let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
394            let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
395            {
396                v
397            } else {
398                let (_, v) = shard_write
399                    .raw_entry_mut()
400                    .from_key_hashed_nocheck(hash, key)
401                    .or_insert_with(|| (key.clone(), self.storage.gauge(key)));
402
403                v
404            };
405
406            op(v)
407        }
408    }
409
410    /// Gets or creates the given histogram.
411    ///
412    /// The `op` function will be called for the histogram under the given `key`, with the histogram
413    /// first being created if it does not already exist.
414    pub fn get_or_create_histogram<O, V>(&self, key: &K, op: O) -> V
415    where
416        O: FnOnce(&S::Histogram) -> V,
417    {
418        let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
419
420        // Try and get the handle if it exists, running our operation if we succeed.
421        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
422        if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
423            op(v)
424        } else {
425            // Switch to write guard and insert the handle first.
426            drop(shard_read);
427            let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
428            let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
429            {
430                v
431            } else {
432                let (_, v) = shard_write
433                    .raw_entry_mut()
434                    .from_key_hashed_nocheck(hash, key)
435                    .or_insert_with(|| (key.clone(), self.storage.histogram(key)));
436
437                v
438            };
439
440            op(v)
441        }
442    }
443    /// Gets a map of all present counters, mapped by key.
444    ///
445    /// This map is a point-in-time snapshot of the registry.
446    pub fn get_counter_handles(&self) -> HashMap<K, S::Counter> {
447        let mut counters = HashMap::new();
448        self.visit_counters(|k, v| {
449            counters.insert(k.clone(), v.clone());
450        });
451        counters
452    }
453
454    /// Gets a map of all present gauges, mapped by key.
455    ///
456    /// This map is a point-in-time snapshot of the registry.
457    pub fn get_gauge_handles(&self) -> HashMap<K, S::Gauge> {
458        let mut gauges = HashMap::new();
459        self.visit_gauges(|k, v| {
460            gauges.insert(k.clone(), v.clone());
461        });
462        gauges
463    }
464
465    /// Gets a map of all present histograms, mapped by key.
466    ///
467    /// This map is a point-in-time snapshot of the registry.
468    pub fn get_histogram_handles(&self) -> HashMap<K, S::Histogram> {
469        let mut histograms = HashMap::new();
470        self.visit_histograms(|k, v| {
471            histograms.insert(k.clone(), v.clone());
472        });
473        histograms
474    }
475}
476
477#[cfg(test)]
478mod tests {
479    use metrics::{atomics::AtomicU64, CounterFn, Key};
480
481    use super::Registry;
482    use std::sync::{atomic::Ordering, Arc};
483
484    #[test]
485    fn test_registry() {
486        let registry = Registry::atomic();
487        let key = Key::from_name("foobar");
488
489        let entries = registry.get_counter_handles();
490        assert_eq!(entries.len(), 0);
491
492        assert!(registry.get_counter(&key).is_none());
493
494        registry.get_or_create_counter(&key, |c: &Arc<AtomicU64>| c.increment(1));
495
496        let initial_entries = registry.get_counter_handles();
497        assert_eq!(initial_entries.len(), 1);
498
499        let initial_entry: (Key, Arc<AtomicU64>) =
500            initial_entries.into_iter().next().expect("failed to get first entry");
501
502        let (ikey, ivalue) = initial_entry;
503        assert_eq!(ikey, key);
504        assert_eq!(ivalue.load(Ordering::SeqCst), 1);
505
506        registry.get_or_create_counter(&key, |c: &Arc<AtomicU64>| c.increment(1));
507
508        let updated_entries = registry.get_counter_handles();
509        assert_eq!(updated_entries.len(), 1);
510
511        let updated_entry: (Key, Arc<AtomicU64>) =
512            updated_entries.into_iter().next().expect("failed to get updated entry");
513
514        let (ukey, uvalue) = updated_entry;
515        assert_eq!(ukey, key);
516        assert_eq!(uvalue.load(Ordering::SeqCst), 2);
517
518        let value = registry.get_counter(&key).expect("failed to get entry");
519        assert!(Arc::ptr_eq(&value, &uvalue));
520
521        registry.get_or_create_counter(&Key::from_name("baz"), |_| ());
522        assert_eq!(registry.get_counter_handles().len(), 2);
523
524        let mut n = 0;
525        registry.retain_counters(|k, _| {
526            n += 1;
527            k.name().starts_with("foo")
528        });
529        assert_eq!(n, 2);
530        assert_eq!(registry.get_counter_handles().len(), 1);
531
532        assert!(registry.delete_counter(&key));
533
534        let entries = registry.get_counter_handles();
535        assert_eq!(entries.len(), 0);
536    }
537}