1mod storage;
4use std::{
5 hash::BuildHasherDefault,
6 iter::repeat,
7 sync::{PoisonError, RwLock},
8};
9
10use hashbrown::{hash_map::RawEntryMut, HashMap};
11use metrics::{Key, KeyHasher};
12pub use storage::{AtomicStorage, Storage};
13
14#[cfg(feature = "recency")]
15mod recency;
16
17#[cfg(feature = "recency")]
18#[cfg_attr(docsrs, doc(cfg(feature = "recency")))]
19pub use recency::{
20 Generation, Generational, GenerationalAtomicStorage, GenerationalStorage, Recency,
21};
22
23use crate::Hashable;
24
25type RegistryHasher = KeyHasher;
26type RegistryHashMap<K, V> = HashMap<K, V, BuildHasherDefault<RegistryHasher>>;
27
28#[derive(Debug)]
49pub struct Registry<K, S>
50where
51 S: Storage<K>,
52{
53 counters: Vec<RwLock<RegistryHashMap<K, S::Counter>>>,
54 gauges: Vec<RwLock<RegistryHashMap<K, S::Gauge>>>,
55 histograms: Vec<RwLock<RegistryHashMap<K, S::Histogram>>>,
56 shard_mask: usize,
57 storage: S,
58}
59
60fn shard_count() -> usize {
61 std::thread::available_parallelism().map(|x| x.get()).unwrap_or(1).next_power_of_two()
62}
63
64impl Registry<Key, AtomicStorage> {
65 pub fn atomic() -> Self {
67 let shard_count = shard_count();
68 let shard_mask = shard_count - 1;
69 let counters =
70 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
71 let gauges =
72 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
73 let histograms =
74 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
75
76 Self { counters, gauges, histograms, shard_mask, storage: AtomicStorage }
77 }
78}
79
80impl<K, S> Registry<K, S>
81where
82 S: Storage<K>,
83{
84 pub fn new(storage: S) -> Self {
86 let shard_count = shard_count();
87 let shard_mask = shard_count - 1;
88 let counters =
89 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
90 let gauges =
91 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
92 let histograms =
93 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
94
95 Self { counters, gauges, histograms, shard_mask, storage }
96 }
97
98 pub fn clear(&self) {
103 for shard in &self.counters {
104 shard.write().unwrap_or_else(PoisonError::into_inner).clear();
105 }
106 for shard in &self.gauges {
107 shard.write().unwrap_or_else(PoisonError::into_inner).clear();
108 }
109 for shard in &self.histograms {
110 shard.write().unwrap_or_else(PoisonError::into_inner).clear();
111 }
112 }
113
114 pub fn visit_counters<F>(&self, mut collect: F)
122 where
123 F: FnMut(&K, &S::Counter),
124 {
125 for subshard in self.counters.iter() {
126 let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
127 for (key, counter) in shard_read.iter() {
128 collect(key, counter);
129 }
130 }
131 }
132 pub fn visit_gauges<F>(&self, mut collect: F)
140 where
141 F: FnMut(&K, &S::Gauge),
142 {
143 for subshard in self.gauges.iter() {
144 let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
145 for (key, gauge) in shard_read.iter() {
146 collect(key, gauge);
147 }
148 }
149 }
150
151 pub fn visit_histograms<F>(&self, mut collect: F)
159 where
160 F: FnMut(&K, &S::Histogram),
161 {
162 for subshard in self.histograms.iter() {
163 let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
164 for (key, histogram) in shard_read.iter() {
165 collect(key, histogram);
166 }
167 }
168 }
169
170 pub fn retain_counters<F>(&self, mut f: F)
175 where
176 F: FnMut(&K, &S::Counter) -> bool,
177 {
178 for subshard in self.counters.iter() {
179 let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
180 shard_write.retain(|k, c| f(k, c));
181 }
182 }
183
184 pub fn retain_gauges<F>(&self, mut f: F)
189 where
190 F: FnMut(&K, &S::Gauge) -> bool,
191 {
192 for subshard in self.gauges.iter() {
193 let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
194 shard_write.retain(|k, g| f(k, g));
195 }
196 }
197
198 pub fn retain_histograms<F>(&self, mut f: F)
203 where
204 F: FnMut(&K, &S::Histogram) -> bool,
205 {
206 for subshard in self.histograms.iter() {
207 let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
208 shard_write.retain(|k, h| f(k, h));
209 }
210 }
211}
212
213impl<K, S> Registry<K, S>
214where
215 S: Storage<K>,
216 K: Hashable,
217{
218 #[inline]
219 fn get_hash_and_shard_for_counter(
220 &self,
221 key: &K,
222 ) -> (u64, &RwLock<RegistryHashMap<K, S::Counter>>) {
223 let hash = key.hashable();
224
225 let shard = unsafe { self.counters.get_unchecked(hash as usize & self.shard_mask) };
229
230 (hash, shard)
231 }
232
233 #[inline]
234 fn get_hash_and_shard_for_gauge(
235 &self,
236 key: &K,
237 ) -> (u64, &RwLock<RegistryHashMap<K, S::Gauge>>) {
238 let hash = key.hashable();
239
240 let shard = unsafe { self.gauges.get_unchecked(hash as usize & self.shard_mask) };
244
245 (hash, shard)
246 }
247
248 #[inline]
249 fn get_hash_and_shard_for_histogram(
250 &self,
251 key: &K,
252 ) -> (u64, &RwLock<RegistryHashMap<K, S::Histogram>>) {
253 let hash = key.hashable();
254
255 let shard = unsafe { self.histograms.get_unchecked(hash as usize & self.shard_mask) };
260
261 (hash, shard)
262 }
263}
264
265impl<K, S> Registry<K, S>
266where
267 S: Storage<K>,
268 K: Eq + Hashable,
269{
270 pub fn delete_counter(&self, key: &K) -> bool {
274 let (hash, shard) = self.get_hash_and_shard_for_counter(key);
275 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
276 let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
277 if let RawEntryMut::Occupied(entry) = entry {
278 let _ = entry.remove_entry();
279 return true;
280 }
281
282 false
283 }
284
285 pub fn delete_gauge(&self, key: &K) -> bool {
289 let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
290 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
291 let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
292 if let RawEntryMut::Occupied(entry) = entry {
293 let _ = entry.remove_entry();
294 return true;
295 }
296
297 false
298 }
299
300 pub fn delete_histogram(&self, key: &K) -> bool {
304 let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
305 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
306 let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
307 if let RawEntryMut::Occupied(entry) = entry {
308 let _ = entry.remove_entry();
309 return true;
310 }
311
312 false
313 }
314
315 pub fn get_counter(&self, key: &K) -> Option<S::Counter> {
317 let (hash, shard) = self.get_hash_and_shard_for_counter(key);
318 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
319 shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
320 }
321
322 pub fn get_gauge(&self, key: &K) -> Option<S::Gauge> {
324 let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
325 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
326 shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
327 }
328
329 pub fn get_histogram(&self, key: &K) -> Option<S::Histogram> {
331 let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
332 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
333 shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
334 }
335}
336
337impl<K, S> Registry<K, S>
338where
339 S: Storage<K>,
340 K: Clone + Eq + Hashable,
341{
342 pub fn get_or_create_counter<O, V>(&self, key: &K, op: O) -> V
347 where
348 O: FnOnce(&S::Counter) -> V,
349 {
350 let (hash, shard) = self.get_hash_and_shard_for_counter(key);
351
352 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
354 if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
355 op(v)
356 } else {
357 drop(shard_read);
359 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
360 let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
361 {
362 v
363 } else {
364 let (_, v) = shard_write
365 .raw_entry_mut()
366 .from_key_hashed_nocheck(hash, key)
367 .or_insert_with(|| (key.clone(), self.storage.counter(key)));
368
369 v
370 };
371
372 op(v)
373 }
374 }
375
376 pub fn get_or_create_gauge<O, V>(&self, key: &K, op: O) -> V
381 where
382 O: FnOnce(&S::Gauge) -> V,
383 {
384 let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
385
386 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
388 if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
389 op(v)
390 } else {
391 drop(shard_read);
393 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
394 let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
395 {
396 v
397 } else {
398 let (_, v) = shard_write
399 .raw_entry_mut()
400 .from_key_hashed_nocheck(hash, key)
401 .or_insert_with(|| (key.clone(), self.storage.gauge(key)));
402
403 v
404 };
405
406 op(v)
407 }
408 }
409
410 pub fn get_or_create_histogram<O, V>(&self, key: &K, op: O) -> V
415 where
416 O: FnOnce(&S::Histogram) -> V,
417 {
418 let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
419
420 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
422 if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
423 op(v)
424 } else {
425 drop(shard_read);
427 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
428 let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
429 {
430 v
431 } else {
432 let (_, v) = shard_write
433 .raw_entry_mut()
434 .from_key_hashed_nocheck(hash, key)
435 .or_insert_with(|| (key.clone(), self.storage.histogram(key)));
436
437 v
438 };
439
440 op(v)
441 }
442 }
443 pub fn get_counter_handles(&self) -> HashMap<K, S::Counter> {
447 let mut counters = HashMap::new();
448 self.visit_counters(|k, v| {
449 counters.insert(k.clone(), v.clone());
450 });
451 counters
452 }
453
454 pub fn get_gauge_handles(&self) -> HashMap<K, S::Gauge> {
458 let mut gauges = HashMap::new();
459 self.visit_gauges(|k, v| {
460 gauges.insert(k.clone(), v.clone());
461 });
462 gauges
463 }
464
465 pub fn get_histogram_handles(&self) -> HashMap<K, S::Histogram> {
469 let mut histograms = HashMap::new();
470 self.visit_histograms(|k, v| {
471 histograms.insert(k.clone(), v.clone());
472 });
473 histograms
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use metrics::{atomics::AtomicU64, CounterFn, Key};
480
481 use super::Registry;
482 use std::sync::{atomic::Ordering, Arc};
483
484 #[test]
485 fn test_registry() {
486 let registry = Registry::atomic();
487 let key = Key::from_name("foobar");
488
489 let entries = registry.get_counter_handles();
490 assert_eq!(entries.len(), 0);
491
492 assert!(registry.get_counter(&key).is_none());
493
494 registry.get_or_create_counter(&key, |c: &Arc<AtomicU64>| c.increment(1));
495
496 let initial_entries = registry.get_counter_handles();
497 assert_eq!(initial_entries.len(), 1);
498
499 let initial_entry: (Key, Arc<AtomicU64>) =
500 initial_entries.into_iter().next().expect("failed to get first entry");
501
502 let (ikey, ivalue) = initial_entry;
503 assert_eq!(ikey, key);
504 assert_eq!(ivalue.load(Ordering::SeqCst), 1);
505
506 registry.get_or_create_counter(&key, |c: &Arc<AtomicU64>| c.increment(1));
507
508 let updated_entries = registry.get_counter_handles();
509 assert_eq!(updated_entries.len(), 1);
510
511 let updated_entry: (Key, Arc<AtomicU64>) =
512 updated_entries.into_iter().next().expect("failed to get updated entry");
513
514 let (ukey, uvalue) = updated_entry;
515 assert_eq!(ukey, key);
516 assert_eq!(uvalue.load(Ordering::SeqCst), 2);
517
518 let value = registry.get_counter(&key).expect("failed to get entry");
519 assert!(Arc::ptr_eq(&value, &uvalue));
520
521 registry.get_or_create_counter(&Key::from_name("baz"), |_| ());
522 assert_eq!(registry.get_counter_handles().len(), 2);
523
524 let mut n = 0;
525 registry.retain_counters(|k, _| {
526 n += 1;
527 k.name().starts_with("foo")
528 });
529 assert_eq!(n, 2);
530 assert_eq!(registry.get_counter_handles().len(), 1);
531
532 assert!(registry.delete_counter(&key));
533
534 let entries = registry.get_counter_handles();
535 assert_eq!(entries.len(), 0);
536 }
537}