1use std::sync::{Arc, Weak};
2
3use metrics::{
4 Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SetRecorderError, SharedString,
5 Unit,
6};
7
8#[derive(Debug)]
9pub struct RecoveryHandle<R> {
10 handle: Arc<R>,
11}
12
13impl<R> RecoveryHandle<R> {
14 pub fn into_inner(mut self) -> R {
23 loop {
24 match Arc::try_unwrap(self.handle) {
25 Ok(recorder) => break recorder,
26 Err(handle) => {
27 self.handle = handle;
28 }
29 }
30 }
31 }
32}
33
34#[derive(Debug)]
56pub struct RecoverableRecorder<R> {
57 handle: Arc<R>,
58}
59
60impl<R: Recorder + Sync + Send + 'static> RecoverableRecorder<R> {
61 pub fn new(recorder: R) -> Self {
63 Self { handle: Arc::new(recorder) }
64 }
65
66 pub(self) fn build(self) -> (WeakRecorder<R>, RecoveryHandle<R>) {
68 let wrapped = WeakRecorder::from_arc(&self.handle);
69
70 (wrapped, RecoveryHandle { handle: self.handle })
71 }
72
73 pub fn install(self) -> Result<RecoveryHandle<R>, SetRecorderError<R>> {
82 let (wrapped, handle) = self.build();
83 match metrics::set_global_recorder(wrapped) {
84 Ok(()) => Ok(handle),
85 Err(_) => {
86 let recorder = handle.into_inner();
87 Err(SetRecorderError(recorder))
88 }
89 }
90 }
91}
92
93#[derive(Debug)]
94struct WeakRecorder<R> {
95 recorder: Weak<R>,
96}
97
98impl<R> WeakRecorder<R> {
99 fn from_arc(recorder: &Arc<R>) -> Self {
100 Self { recorder: Arc::downgrade(recorder) }
101 }
102}
103
104impl<R: Recorder> Recorder for WeakRecorder<R> {
105 fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
106 if let Some(recorder) = self.recorder.upgrade() {
107 recorder.describe_counter(key, unit, description);
108 }
109 }
110
111 fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
112 if let Some(recorder) = self.recorder.upgrade() {
113 recorder.describe_gauge(key, unit, description);
114 }
115 }
116
117 fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
118 if let Some(recorder) = self.recorder.upgrade() {
119 recorder.describe_histogram(key, unit, description);
120 }
121 }
122
123 fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
124 if let Some(recorder) = self.recorder.upgrade() {
125 recorder.register_counter(key, metadata)
126 } else {
127 Counter::noop()
128 }
129 }
130
131 fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
132 if let Some(recorder) = self.recorder.upgrade() {
133 recorder.register_gauge(key, metadata)
134 } else {
135 Gauge::noop()
136 }
137 }
138
139 fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
140 if let Some(recorder) = self.recorder.upgrade() {
141 recorder.register_histogram(key, metadata)
142 } else {
143 Histogram::noop()
144 }
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use std::sync::atomic::{AtomicBool, Ordering};
151
152 use super::*;
153 use metrics::{atomics::AtomicU64, CounterFn, GaugeFn, HistogramFn, Key, Recorder};
154
155 #[derive(Debug)]
156 struct CounterWrapper(AtomicU64);
157
158 #[derive(Debug)]
159 struct GaugeWrapper(AtomicU64);
160
161 #[derive(Debug)]
162 struct HistogramWrapper(AtomicU64);
163
164 impl CounterWrapper {
165 fn get(&self) -> u64 {
166 self.0.load(Ordering::Acquire)
167 }
168 }
169
170 impl GaugeWrapper {
171 fn get(&self) -> u64 {
172 self.0.load(Ordering::Acquire)
173 }
174 }
175
176 impl HistogramWrapper {
177 fn get(&self) -> u64 {
178 self.0.load(Ordering::Acquire)
179 }
180 }
181
182 impl CounterFn for CounterWrapper {
183 fn increment(&self, value: u64) {
184 self.0.fetch_add(value, Ordering::Release);
185 }
186
187 fn absolute(&self, value: u64) {
188 self.0.store(value, Ordering::Release);
189 }
190 }
191
192 impl GaugeFn for GaugeWrapper {
193 fn increment(&self, value: f64) {
194 self.0.fetch_add(value as u64, Ordering::Release);
195 }
196
197 fn decrement(&self, value: f64) {
198 self.0.fetch_sub(value as u64, Ordering::Release);
199 }
200
201 fn set(&self, value: f64) {
202 self.0.store(value as u64, Ordering::Release);
203 }
204 }
205
206 impl HistogramFn for HistogramWrapper {
207 fn record(&self, value: f64) {
208 self.0.fetch_add(value as u64, Ordering::Release);
209 }
210 }
211
212 #[derive(Debug)]
213 struct TestRecorder {
214 dropped: Arc<AtomicBool>,
215 counter: Arc<CounterWrapper>,
216 gauge: Arc<GaugeWrapper>,
217 histogram: Arc<HistogramWrapper>,
218 }
219
220 impl TestRecorder {
221 fn new() -> (Self, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>) {
222 let (recorder, _, counter, gauge, histogram) = Self::new_with_drop();
223 (recorder, counter, gauge, histogram)
224 }
225
226 fn new_with_drop(
227 ) -> (Self, Arc<AtomicBool>, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>)
228 {
229 let dropped = Arc::new(AtomicBool::new(false));
230 let counter = Arc::new(CounterWrapper(AtomicU64::new(0)));
231 let gauge = Arc::new(GaugeWrapper(AtomicU64::new(0)));
232 let histogram = Arc::new(HistogramWrapper(AtomicU64::new(0)));
233
234 let recorder = Self {
235 dropped: Arc::clone(&dropped),
236 counter: Arc::clone(&counter),
237 gauge: Arc::clone(&gauge),
238 histogram: Arc::clone(&histogram),
239 };
240
241 (recorder, dropped, counter, gauge, histogram)
242 }
243 }
244
245 impl Recorder for TestRecorder {
246 fn describe_counter(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
247 todo!()
248 }
249
250 fn describe_gauge(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
251 todo!()
252 }
253
254 fn describe_histogram(
255 &self,
256 _key: KeyName,
257 _unit: Option<Unit>,
258 _description: SharedString,
259 ) {
260 todo!()
261 }
262
263 fn register_counter(&self, _: &Key, _: &Metadata<'_>) -> Counter {
264 Counter::from_arc(Arc::clone(&self.counter))
265 }
266
267 fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> Gauge {
268 Gauge::from_arc(Arc::clone(&self.gauge))
269 }
270
271 fn register_histogram(&self, _: &Key, _: &Metadata<'_>) -> Histogram {
272 Histogram::from_arc(Arc::clone(&self.histogram))
273 }
274 }
275
276 impl Drop for TestRecorder {
277 fn drop(&mut self) {
278 self.dropped.store(true, Ordering::Release);
279 }
280 }
281
282 #[test]
283 fn basic() {
284 let (recorder, counter, gauge, histogram) = TestRecorder::new();
286 let recoverable = RecoverableRecorder::new(recorder);
287 let (recorder, handle) = recoverable.build();
288
289 metrics::with_local_recorder(&recorder, || {
292 metrics::counter!("counter").increment(5);
293 metrics::gauge!("gauge").increment(5.0);
294 metrics::gauge!("gauge").increment(5.0);
295 metrics::histogram!("histogram").record(5.0);
296 metrics::histogram!("histogram").record(5.0);
297 metrics::histogram!("histogram").record(5.0);
298 });
299
300 let _recorder = handle.into_inner();
301 assert_eq!(counter.get(), 5);
302 assert_eq!(gauge.get(), 10);
303 assert_eq!(histogram.get(), 15);
304
305 metrics::with_local_recorder(&recorder, || {
308 metrics::counter!("counter").increment(7);
309 metrics::gauge!("gauge").increment(7.0);
310 metrics::histogram!("histogram").record(7.0);
311 });
312
313 assert_eq!(counter.get(), 5);
314 assert_eq!(gauge.get(), 10);
315 assert_eq!(histogram.get(), 15);
316 }
317
318 #[test]
319 fn on_drop() {
320 let (recorder, dropped, counter, gauge, histogram) = TestRecorder::new_with_drop();
322 let recoverable = RecoverableRecorder::new(recorder);
323 let (recorder, handle) = recoverable.build();
324
325 metrics::with_local_recorder(&recorder, || {
328 metrics::counter!("counter").increment(5);
329 metrics::gauge!("gauge").increment(5.0);
330 metrics::gauge!("gauge").increment(5.0);
331 metrics::histogram!("histogram").record(5.0);
332 metrics::histogram!("histogram").record(5.0);
333 metrics::histogram!("histogram").record(5.0);
334 });
335
336 drop(handle.into_inner());
337 assert_eq!(counter.get(), 5);
338 assert_eq!(gauge.get(), 10);
339 assert_eq!(histogram.get(), 15);
340
341 metrics::with_local_recorder(&recorder, || {
344 metrics::counter!("counter").increment(7);
345 metrics::gauge!("gauge").increment(7.0);
346 metrics::histogram!("histogram").record(7.0);
347 });
348
349 assert_eq!(counter.get(), 5);
350 assert_eq!(gauge.get(), 10);
351 assert_eq!(histogram.get(), 15);
352
353 assert!(dropped.load(Ordering::Acquire));
355 }
356}