1use std::{
21 borrow::{Borrow, Cow},
22 fmt::Display,
23 sync::{
24 atomic::{AtomicUsize, Ordering},
25 Arc,
26 },
27 time::Duration,
28};
29
30use chrono::{DateTime, Utc};
31use datafusion_common::instant::Instant;
32use parking_lot::Mutex;
33
34#[derive(Debug, Clone)]
38pub struct Count {
39 value: Arc<AtomicUsize>,
41}
42
43impl PartialEq for Count {
44 fn eq(&self, other: &Self) -> bool {
45 self.value().eq(&other.value())
46 }
47}
48
49impl Display for Count {
50 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
51 write!(f, "{}", self.value())
52 }
53}
54
55impl Default for Count {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl Count {
62 pub fn new() -> Self {
64 Self {
65 value: Arc::new(AtomicUsize::new(0)),
66 }
67 }
68
69 pub fn add(&self, n: usize) {
71 self.value.fetch_add(n, Ordering::Relaxed);
74 }
75
76 pub fn value(&self) -> usize {
78 self.value.load(Ordering::Relaxed)
79 }
80}
81
82#[derive(Debug, Clone)]
87pub struct Gauge {
88 value: Arc<AtomicUsize>,
90}
91
92impl PartialEq for Gauge {
93 fn eq(&self, other: &Self) -> bool {
94 self.value().eq(&other.value())
95 }
96}
97
98impl Display for Gauge {
99 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
100 write!(f, "{}", self.value())
101 }
102}
103
104impl Default for Gauge {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110impl Gauge {
111 pub fn new() -> Self {
113 Self {
114 value: Arc::new(AtomicUsize::new(0)),
115 }
116 }
117
118 pub fn add(&self, n: usize) {
120 self.value.fetch_add(n, Ordering::Relaxed);
123 }
124
125 pub fn sub(&self, n: usize) {
127 self.value.fetch_sub(n, Ordering::Relaxed);
130 }
131
132 pub fn set_max(&self, n: usize) {
134 self.value.fetch_max(n, Ordering::Relaxed);
135 }
136
137 pub fn set(&self, n: usize) -> usize {
139 self.value.swap(n, Ordering::Relaxed)
142 }
143
144 pub fn value(&self) -> usize {
146 self.value.load(Ordering::Relaxed)
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct Time {
153 nanos: Arc<AtomicUsize>,
155}
156
157impl Default for Time {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163impl PartialEq for Time {
164 fn eq(&self, other: &Self) -> bool {
165 self.value().eq(&other.value())
166 }
167}
168
169impl Display for Time {
170 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
171 let duration = Duration::from_nanos(self.value() as u64);
172 write!(f, "{duration:?}")
173 }
174}
175
176impl Time {
177 pub fn new() -> Self {
180 Self {
181 nanos: Arc::new(AtomicUsize::new(0)),
182 }
183 }
184
185 pub fn add_elapsed(&self, start: Instant) {
187 self.add_duration(start.elapsed());
188 }
189
190 pub fn add_duration(&self, duration: Duration) {
201 let more_nanos = duration.as_nanos() as usize;
202 self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
203 }
204
205 pub fn add(&self, other: &Time) {
207 self.add_duration(Duration::from_nanos(other.value() as u64))
208 }
209
210 pub fn timer(&self) -> ScopedTimerGuard<'_> {
214 ScopedTimerGuard {
215 inner: self,
216 start: Some(Instant::now()),
217 }
218 }
219
220 pub fn value(&self) -> usize {
222 self.nanos.load(Ordering::Relaxed)
223 }
224}
225
226#[derive(Debug, Clone)]
229pub struct Timestamp {
230 timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
232}
233
234impl Default for Timestamp {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240impl Timestamp {
241 pub fn new() -> Self {
243 Self {
244 timestamp: Arc::new(Mutex::new(None)),
245 }
246 }
247
248 pub fn record(&self) {
250 self.set(Utc::now())
251 }
252
253 pub fn set(&self, now: DateTime<Utc>) {
255 *self.timestamp.lock() = Some(now);
256 }
257
258 pub fn value(&self) -> Option<DateTime<Utc>> {
263 *self.timestamp.lock()
264 }
265
266 pub fn update_to_min(&self, other: &Timestamp) {
268 let min = match (self.value(), other.value()) {
269 (None, None) => None,
270 (Some(v), None) => Some(v),
271 (None, Some(v)) => Some(v),
272 (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
273 };
274
275 *self.timestamp.lock() = min;
276 }
277
278 pub fn update_to_max(&self, other: &Timestamp) {
280 let max = match (self.value(), other.value()) {
281 (None, None) => None,
282 (Some(v), None) => Some(v),
283 (None, Some(v)) => Some(v),
284 (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
285 };
286
287 *self.timestamp.lock() = max;
288 }
289}
290
291impl PartialEq for Timestamp {
292 fn eq(&self, other: &Self) -> bool {
293 self.value().eq(&other.value())
294 }
295}
296
297impl Display for Timestamp {
298 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
299 match self.value() {
300 None => write!(f, "NONE"),
301 Some(v) => {
302 write!(f, "{v}")
303 }
304 }
305 }
306}
307
308pub struct ScopedTimerGuard<'a> {
312 inner: &'a Time,
313 start: Option<Instant>,
314}
315
316impl ScopedTimerGuard<'_> {
317 pub fn stop(&mut self) {
319 if let Some(start) = self.start.take() {
320 self.inner.add_elapsed(start)
321 }
322 }
323
324 pub fn restart(&mut self) {
326 self.start = Some(Instant::now())
327 }
328
329 pub fn done(mut self) {
331 self.stop()
332 }
333}
334
335impl Drop for ScopedTimerGuard<'_> {
336 fn drop(&mut self) {
337 self.stop()
338 }
339}
340
341#[derive(Debug, Clone, PartialEq)]
347pub enum MetricValue {
348 OutputRows(Count),
350 ElapsedCompute(Time),
370 SpillCount(Count),
372 SpilledBytes(Count),
374 SpilledRows(Count),
376 CurrentMemoryUsage(Gauge),
378 Count {
380 name: Cow<'static, str>,
382 count: Count,
384 },
385 Gauge {
387 name: Cow<'static, str>,
389 gauge: Gauge,
391 },
392 Time {
394 name: Cow<'static, str>,
396 time: Time,
398 },
399 StartTimestamp(Timestamp),
401 EndTimestamp(Timestamp),
403}
404
405impl MetricValue {
406 pub fn name(&self) -> &str {
408 match self {
409 Self::OutputRows(_) => "output_rows",
410 Self::SpillCount(_) => "spill_count",
411 Self::SpilledBytes(_) => "spilled_bytes",
412 Self::SpilledRows(_) => "spilled_rows",
413 Self::CurrentMemoryUsage(_) => "mem_used",
414 Self::ElapsedCompute(_) => "elapsed_compute",
415 Self::Count { name, .. } => name.borrow(),
416 Self::Gauge { name, .. } => name.borrow(),
417 Self::Time { name, .. } => name.borrow(),
418 Self::StartTimestamp(_) => "start_timestamp",
419 Self::EndTimestamp(_) => "end_timestamp",
420 }
421 }
422
423 pub fn as_usize(&self) -> usize {
425 match self {
426 Self::OutputRows(count) => count.value(),
427 Self::SpillCount(count) => count.value(),
428 Self::SpilledBytes(bytes) => bytes.value(),
429 Self::SpilledRows(count) => count.value(),
430 Self::CurrentMemoryUsage(used) => used.value(),
431 Self::ElapsedCompute(time) => time.value(),
432 Self::Count { count, .. } => count.value(),
433 Self::Gauge { gauge, .. } => gauge.value(),
434 Self::Time { time, .. } => time.value(),
435 Self::StartTimestamp(timestamp) => timestamp
436 .value()
437 .and_then(|ts| ts.timestamp_nanos_opt())
438 .map(|nanos| nanos as usize)
439 .unwrap_or(0),
440 Self::EndTimestamp(timestamp) => timestamp
441 .value()
442 .and_then(|ts| ts.timestamp_nanos_opt())
443 .map(|nanos| nanos as usize)
444 .unwrap_or(0),
445 }
446 }
447
448 pub fn new_empty(&self) -> Self {
451 match self {
452 Self::OutputRows(_) => Self::OutputRows(Count::new()),
453 Self::SpillCount(_) => Self::SpillCount(Count::new()),
454 Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
455 Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
456 Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
457 Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
458 Self::Count { name, .. } => Self::Count {
459 name: name.clone(),
460 count: Count::new(),
461 },
462 Self::Gauge { name, .. } => Self::Gauge {
463 name: name.clone(),
464 gauge: Gauge::new(),
465 },
466 Self::Time { name, .. } => Self::Time {
467 name: name.clone(),
468 time: Time::new(),
469 },
470 Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
471 Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
472 }
473 }
474
475 pub fn aggregate(&mut self, other: &Self) {
485 match (self, other) {
486 (Self::OutputRows(count), Self::OutputRows(other_count))
487 | (Self::SpillCount(count), Self::SpillCount(other_count))
488 | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
489 | (Self::SpilledRows(count), Self::SpilledRows(other_count))
490 | (
491 Self::Count { count, .. },
492 Self::Count {
493 count: other_count, ..
494 },
495 ) => count.add(other_count.value()),
496 (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
497 | (
498 Self::Gauge { gauge, .. },
499 Self::Gauge {
500 gauge: other_gauge, ..
501 },
502 ) => gauge.add(other_gauge.value()),
503 (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
504 | (
505 Self::Time { time, .. },
506 Self::Time {
507 time: other_time, ..
508 },
509 ) => time.add(other_time),
510 (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
512 timestamp.update_to_min(other_timestamp);
513 }
514 (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
516 timestamp.update_to_max(other_timestamp);
517 }
518 m @ (_, _) => {
519 panic!(
520 "Mismatched metric types. Can not aggregate {:?} with value {:?}",
521 m.0, m.1
522 )
523 }
524 }
525 }
526
527 pub fn display_sort_key(&self) -> u8 {
530 match self {
531 Self::OutputRows(_) => 0, Self::ElapsedCompute(_) => 1, Self::SpillCount(_) => 2,
534 Self::SpilledBytes(_) => 3,
535 Self::SpilledRows(_) => 4,
536 Self::CurrentMemoryUsage(_) => 5,
537 Self::Count { .. } => 6,
538 Self::Gauge { .. } => 7,
539 Self::Time { .. } => 8,
540 Self::StartTimestamp(_) => 9, Self::EndTimestamp(_) => 10,
542 }
543 }
544
545 pub fn is_timestamp(&self) -> bool {
547 matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
548 }
549}
550
551impl Display for MetricValue {
552 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
554 match self {
555 Self::OutputRows(count)
556 | Self::SpillCount(count)
557 | Self::SpilledBytes(count)
558 | Self::SpilledRows(count)
559 | Self::Count { count, .. } => {
560 write!(f, "{count}")
561 }
562 Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
563 write!(f, "{gauge}")
564 }
565 Self::ElapsedCompute(time) | Self::Time { time, .. } => {
566 if time.value() > 0 {
569 write!(f, "{time}")
570 } else {
571 write!(f, "NOT RECORDED")
572 }
573 }
574 Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
575 write!(f, "{timestamp}")
576 }
577 }
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use chrono::TimeZone;
584
585 use super::*;
586
587 #[test]
588 fn test_display_output_rows() {
589 let count = Count::new();
590 let values = vec![
591 MetricValue::OutputRows(count.clone()),
592 MetricValue::Count {
593 name: "my_counter".into(),
594 count: count.clone(),
595 },
596 ];
597
598 for value in &values {
599 assert_eq!("0", value.to_string(), "value {value:?}");
600 }
601
602 count.add(42);
603 for value in &values {
604 assert_eq!("42", value.to_string(), "value {value:?}");
605 }
606 }
607
608 #[test]
609 fn test_display_time() {
610 let time = Time::new();
611 let values = vec![
612 MetricValue::ElapsedCompute(time.clone()),
613 MetricValue::Time {
614 name: "my_time".into(),
615 time: time.clone(),
616 },
617 ];
618
619 for value in &values {
621 assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
622 }
623
624 time.add_duration(Duration::from_nanos(1042));
625 for value in &values {
626 assert_eq!("1.042µs", value.to_string(), "value {value:?}");
627 }
628 }
629
630 #[test]
631 fn test_display_timestamp() {
632 let timestamp = Timestamp::new();
633 let values = vec![
634 MetricValue::StartTimestamp(timestamp.clone()),
635 MetricValue::EndTimestamp(timestamp.clone()),
636 ];
637
638 for value in &values {
640 assert_eq!("NONE", value.to_string(), "value {value:?}");
641 }
642
643 timestamp.set(Utc.timestamp_nanos(1431648000000000));
644 for value in &values {
645 assert_eq!(
646 "1970-01-17 13:40:48 UTC",
647 value.to_string(),
648 "value {value:?}"
649 );
650 }
651 }
652}