metrics_util/
recoverable.rs

1use std::sync::{Arc, Weak};
2
3use metrics::{
4    Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SetRecorderError, SharedString,
5    Unit,
6};
7
8#[derive(Debug)]
9pub struct RecoveryHandle<R> {
10    handle: Arc<R>,
11}
12
13impl<R> RecoveryHandle<R> {
14    /// Consumes the handle, returning the original recorder.
15    ///
16    /// This method will loop until there are no other strong references to the recorder. This means
17    /// that the wrapped recorder which was installed is not being actively used, as using it
18    /// temporarily upgrades its internal weak reference to a strong reference.
19    ///
20    /// It is not advised to call this method under heavy load, as doing so is not deterministic or
21    /// ordered and may block for an indefinite amount of time.
22    pub fn into_inner(mut self) -> R {
23        loop {
24            match Arc::try_unwrap(self.handle) {
25                Ok(recorder) => break recorder,
26                Err(handle) => {
27                    self.handle = handle;
28                }
29            }
30        }
31    }
32}
33
34/// Wraps a recorder to allow for recovering it after being installed.
35///
36/// Installing a recorder generally involves providing an owned value, which means that it is not
37/// possible to recover the recorder after it has been installed. For some recorder implementations,
38/// it can be important to perform finalization before the application exits, which is not possible
39/// if the application cannot consume the recorder.
40///
41/// `RecoverableRecorder` allows wrapping a recorder such that a weak reference to it is installed
42/// globally, while the recorder itself is held by `RecoveryHandle<R>`. This allows the recorder to
43/// be used globally so long as the recovery handle is active, keeping the original recorder alive.
44///
45/// ## As a drop guard
46///
47/// While `RecoveryHandle<R>` provides a method to manually recover the recorder directly, one
48/// particular benefit is that due to how the recorder is wrapped, when `RecoveryHandle<R>` is
49/// dropped, and the last active reference to the wrapped recorder is dropped, the recorder itself
50/// will be dropped.
51///
52/// This allows using `RecoveryHandle<R>` as a drop guard, ensuring that by dropping it, the
53/// recorder itself will be dropped, and any finalization logic implemented for the recorder will be
54/// run.
55#[derive(Debug)]
56pub struct RecoverableRecorder<R> {
57    handle: Arc<R>,
58}
59
60impl<R: Recorder + Sync + Send + 'static> RecoverableRecorder<R> {
61    /// Creates a new `RecoverableRecorder` from the given recorder.
62    pub fn new(recorder: R) -> Self {
63        Self { handle: Arc::new(recorder) }
64    }
65
66    /// Builds the wrapped recorder and a handle to recover the original.
67    pub(self) fn build(self) -> (WeakRecorder<R>, RecoveryHandle<R>) {
68        let wrapped = WeakRecorder::from_arc(&self.handle);
69
70        (wrapped, RecoveryHandle { handle: self.handle })
71    }
72
73    /// Installs the wrapped recorder globally, returning a handle to recover it.
74    ///
75    /// A weakly-referenced version of the recorder is installed globally, while the original
76    /// recorder is held within `RecoverableRecorder`, and can be recovered by calling `into_inner`.
77    ///
78    /// # Errors
79    ///
80    /// If a recorder is already installed, an error is returned containing the original recorder.
81    pub fn install(self) -> Result<RecoveryHandle<R>, SetRecorderError<R>> {
82        let (wrapped, handle) = self.build();
83        match metrics::set_global_recorder(wrapped) {
84            Ok(()) => Ok(handle),
85            Err(_) => {
86                let recorder = handle.into_inner();
87                Err(SetRecorderError(recorder))
88            }
89        }
90    }
91}
92
93#[derive(Debug)]
94struct WeakRecorder<R> {
95    recorder: Weak<R>,
96}
97
98impl<R> WeakRecorder<R> {
99    fn from_arc(recorder: &Arc<R>) -> Self {
100        Self { recorder: Arc::downgrade(recorder) }
101    }
102}
103
104impl<R: Recorder> Recorder for WeakRecorder<R> {
105    fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
106        if let Some(recorder) = self.recorder.upgrade() {
107            recorder.describe_counter(key, unit, description);
108        }
109    }
110
111    fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
112        if let Some(recorder) = self.recorder.upgrade() {
113            recorder.describe_gauge(key, unit, description);
114        }
115    }
116
117    fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
118        if let Some(recorder) = self.recorder.upgrade() {
119            recorder.describe_histogram(key, unit, description);
120        }
121    }
122
123    fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
124        if let Some(recorder) = self.recorder.upgrade() {
125            recorder.register_counter(key, metadata)
126        } else {
127            Counter::noop()
128        }
129    }
130
131    fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
132        if let Some(recorder) = self.recorder.upgrade() {
133            recorder.register_gauge(key, metadata)
134        } else {
135            Gauge::noop()
136        }
137    }
138
139    fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
140        if let Some(recorder) = self.recorder.upgrade() {
141            recorder.register_histogram(key, metadata)
142        } else {
143            Histogram::noop()
144        }
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use std::sync::atomic::{AtomicBool, Ordering};
151
152    use super::*;
153    use metrics::{atomics::AtomicU64, CounterFn, GaugeFn, HistogramFn, Key, Recorder};
154
155    #[derive(Debug)]
156    struct CounterWrapper(AtomicU64);
157
158    #[derive(Debug)]
159    struct GaugeWrapper(AtomicU64);
160
161    #[derive(Debug)]
162    struct HistogramWrapper(AtomicU64);
163
164    impl CounterWrapper {
165        fn get(&self) -> u64 {
166            self.0.load(Ordering::Acquire)
167        }
168    }
169
170    impl GaugeWrapper {
171        fn get(&self) -> u64 {
172            self.0.load(Ordering::Acquire)
173        }
174    }
175
176    impl HistogramWrapper {
177        fn get(&self) -> u64 {
178            self.0.load(Ordering::Acquire)
179        }
180    }
181
182    impl CounterFn for CounterWrapper {
183        fn increment(&self, value: u64) {
184            self.0.fetch_add(value, Ordering::Release);
185        }
186
187        fn absolute(&self, value: u64) {
188            self.0.store(value, Ordering::Release);
189        }
190    }
191
192    impl GaugeFn for GaugeWrapper {
193        fn increment(&self, value: f64) {
194            self.0.fetch_add(value as u64, Ordering::Release);
195        }
196
197        fn decrement(&self, value: f64) {
198            self.0.fetch_sub(value as u64, Ordering::Release);
199        }
200
201        fn set(&self, value: f64) {
202            self.0.store(value as u64, Ordering::Release);
203        }
204    }
205
206    impl HistogramFn for HistogramWrapper {
207        fn record(&self, value: f64) {
208            self.0.fetch_add(value as u64, Ordering::Release);
209        }
210    }
211
212    #[derive(Debug)]
213    struct TestRecorder {
214        dropped: Arc<AtomicBool>,
215        counter: Arc<CounterWrapper>,
216        gauge: Arc<GaugeWrapper>,
217        histogram: Arc<HistogramWrapper>,
218    }
219
220    impl TestRecorder {
221        fn new() -> (Self, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>) {
222            let (recorder, _, counter, gauge, histogram) = Self::new_with_drop();
223            (recorder, counter, gauge, histogram)
224        }
225
226        fn new_with_drop(
227        ) -> (Self, Arc<AtomicBool>, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>)
228        {
229            let dropped = Arc::new(AtomicBool::new(false));
230            let counter = Arc::new(CounterWrapper(AtomicU64::new(0)));
231            let gauge = Arc::new(GaugeWrapper(AtomicU64::new(0)));
232            let histogram = Arc::new(HistogramWrapper(AtomicU64::new(0)));
233
234            let recorder = Self {
235                dropped: Arc::clone(&dropped),
236                counter: Arc::clone(&counter),
237                gauge: Arc::clone(&gauge),
238                histogram: Arc::clone(&histogram),
239            };
240
241            (recorder, dropped, counter, gauge, histogram)
242        }
243    }
244
245    impl Recorder for TestRecorder {
246        fn describe_counter(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
247            todo!()
248        }
249
250        fn describe_gauge(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
251            todo!()
252        }
253
254        fn describe_histogram(
255            &self,
256            _key: KeyName,
257            _unit: Option<Unit>,
258            _description: SharedString,
259        ) {
260            todo!()
261        }
262
263        fn register_counter(&self, _: &Key, _: &Metadata<'_>) -> Counter {
264            Counter::from_arc(Arc::clone(&self.counter))
265        }
266
267        fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> Gauge {
268            Gauge::from_arc(Arc::clone(&self.gauge))
269        }
270
271        fn register_histogram(&self, _: &Key, _: &Metadata<'_>) -> Histogram {
272            Histogram::from_arc(Arc::clone(&self.histogram))
273        }
274    }
275
276    impl Drop for TestRecorder {
277        fn drop(&mut self) {
278            self.dropped.store(true, Ordering::Release);
279        }
280    }
281
282    #[test]
283    fn basic() {
284        // Create and install the recorder.
285        let (recorder, counter, gauge, histogram) = TestRecorder::new();
286        let recoverable = RecoverableRecorder::new(recorder);
287        let (recorder, handle) = recoverable.build();
288
289        // Record some metrics, and make sure the atomics for each metric type are
290        // incremented as we would expect them to be.
291        metrics::with_local_recorder(&recorder, || {
292            metrics::counter!("counter").increment(5);
293            metrics::gauge!("gauge").increment(5.0);
294            metrics::gauge!("gauge").increment(5.0);
295            metrics::histogram!("histogram").record(5.0);
296            metrics::histogram!("histogram").record(5.0);
297            metrics::histogram!("histogram").record(5.0);
298        });
299
300        let _recorder = handle.into_inner();
301        assert_eq!(counter.get(), 5);
302        assert_eq!(gauge.get(), 10);
303        assert_eq!(histogram.get(), 15);
304
305        // Now that we've recovered the recorder, incrementing the same metrics should
306        // not actually increment the value of the atomics for each metric type.
307        metrics::with_local_recorder(&recorder, || {
308            metrics::counter!("counter").increment(7);
309            metrics::gauge!("gauge").increment(7.0);
310            metrics::histogram!("histogram").record(7.0);
311        });
312
313        assert_eq!(counter.get(), 5);
314        assert_eq!(gauge.get(), 10);
315        assert_eq!(histogram.get(), 15);
316    }
317
318    #[test]
319    fn on_drop() {
320        // Create and install the recorder.
321        let (recorder, dropped, counter, gauge, histogram) = TestRecorder::new_with_drop();
322        let recoverable = RecoverableRecorder::new(recorder);
323        let (recorder, handle) = recoverable.build();
324
325        // Record some metrics, and make sure the atomics for each metric type are
326        // incremented as we would expect them to be.
327        metrics::with_local_recorder(&recorder, || {
328            metrics::counter!("counter").increment(5);
329            metrics::gauge!("gauge").increment(5.0);
330            metrics::gauge!("gauge").increment(5.0);
331            metrics::histogram!("histogram").record(5.0);
332            metrics::histogram!("histogram").record(5.0);
333            metrics::histogram!("histogram").record(5.0);
334        });
335
336        drop(handle.into_inner());
337        assert_eq!(counter.get(), 5);
338        assert_eq!(gauge.get(), 10);
339        assert_eq!(histogram.get(), 15);
340
341        // Now that we've recovered the recorder, incrementing the same metrics should
342        // not actually increment the value of the atomics for each metric type.
343        metrics::with_local_recorder(&recorder, || {
344            metrics::counter!("counter").increment(7);
345            metrics::gauge!("gauge").increment(7.0);
346            metrics::histogram!("histogram").record(7.0);
347        });
348
349        assert_eq!(counter.get(), 5);
350        assert_eq!(gauge.get(), 10);
351        assert_eq!(histogram.get(), 15);
352
353        // And we should be able to check that the recorder was indeed dropped.
354        assert!(dropped.load(Ordering::Acquire));
355    }
356}