datafusion_physical_plan/metrics/
value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Value representation of metrics
19
20use std::{
21    borrow::{Borrow, Cow},
22    fmt::Display,
23    sync::{
24        atomic::{AtomicUsize, Ordering},
25        Arc,
26    },
27    time::Duration,
28};
29
30use chrono::{DateTime, Utc};
31use datafusion_common::instant::Instant;
32use parking_lot::Mutex;
33
34/// A counter to record things such as number of input or output rows
35///
36/// Note `clone`ing counters update the same underlying metrics
37#[derive(Debug, Clone)]
38pub struct Count {
39    /// value of the metric counter
40    value: Arc<AtomicUsize>,
41}
42
43impl PartialEq for Count {
44    fn eq(&self, other: &Self) -> bool {
45        self.value().eq(&other.value())
46    }
47}
48
49impl Display for Count {
50    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
51        write!(f, "{}", self.value())
52    }
53}
54
55impl Default for Count {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl Count {
62    /// create a new counter
63    pub fn new() -> Self {
64        Self {
65            value: Arc::new(AtomicUsize::new(0)),
66        }
67    }
68
69    /// Add `n` to the metric's value
70    pub fn add(&self, n: usize) {
71        // relaxed ordering for operations on `value` poses no issues
72        // we're purely using atomic ops with no associated memory ops
73        self.value.fetch_add(n, Ordering::Relaxed);
74    }
75
76    /// Get the current value
77    pub fn value(&self) -> usize {
78        self.value.load(Ordering::Relaxed)
79    }
80}
81
82/// A gauge is the simplest metrics type. It just returns a value.
83/// For example, you can easily expose current memory consumption with a gauge.
84///
85/// Note `clone`ing gauge update the same underlying metrics
86#[derive(Debug, Clone)]
87pub struct Gauge {
88    /// value of the metric gauge
89    value: Arc<AtomicUsize>,
90}
91
92impl PartialEq for Gauge {
93    fn eq(&self, other: &Self) -> bool {
94        self.value().eq(&other.value())
95    }
96}
97
98impl Display for Gauge {
99    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
100        write!(f, "{}", self.value())
101    }
102}
103
104impl Default for Gauge {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl Gauge {
111    /// create a new gauge
112    pub fn new() -> Self {
113        Self {
114            value: Arc::new(AtomicUsize::new(0)),
115        }
116    }
117
118    /// Add `n` to the metric's value
119    pub fn add(&self, n: usize) {
120        // relaxed ordering for operations on `value` poses no issues
121        // we're purely using atomic ops with no associated memory ops
122        self.value.fetch_add(n, Ordering::Relaxed);
123    }
124
125    /// Sub `n` from the metric's value
126    pub fn sub(&self, n: usize) {
127        // relaxed ordering for operations on `value` poses no issues
128        // we're purely using atomic ops with no associated memory ops
129        self.value.fetch_sub(n, Ordering::Relaxed);
130    }
131
132    /// Set metric's value to maximum of `n` and current value
133    pub fn set_max(&self, n: usize) {
134        self.value.fetch_max(n, Ordering::Relaxed);
135    }
136
137    /// Set the metric's value to `n` and return the previous value
138    pub fn set(&self, n: usize) -> usize {
139        // relaxed ordering for operations on `value` poses no issues
140        // we're purely using atomic ops with no associated memory ops
141        self.value.swap(n, Ordering::Relaxed)
142    }
143
144    /// Get the current value
145    pub fn value(&self) -> usize {
146        self.value.load(Ordering::Relaxed)
147    }
148}
149
150/// Measure a potentially non contiguous duration of time
151#[derive(Debug, Clone)]
152pub struct Time {
153    /// elapsed time, in nanoseconds
154    nanos: Arc<AtomicUsize>,
155}
156
157impl Default for Time {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163impl PartialEq for Time {
164    fn eq(&self, other: &Self) -> bool {
165        self.value().eq(&other.value())
166    }
167}
168
169impl Display for Time {
170    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
171        let duration = Duration::from_nanos(self.value() as u64);
172        write!(f, "{duration:?}")
173    }
174}
175
176impl Time {
177    /// Create a new [`Time`] wrapper suitable for recording elapsed
178    /// times for operations.
179    pub fn new() -> Self {
180        Self {
181            nanos: Arc::new(AtomicUsize::new(0)),
182        }
183    }
184
185    /// Add elapsed nanoseconds since `start`to self
186    pub fn add_elapsed(&self, start: Instant) {
187        self.add_duration(start.elapsed());
188    }
189
190    /// Add duration of time to self
191    ///
192    /// Note: this will always increment the recorded time by at least 1 nanosecond
193    /// to distinguish between the scenario of no values recorded, in which
194    /// case the value will be 0, and no measurable amount of time having passed,
195    /// in which case the value will be small but not 0.
196    ///
197    /// This is based on the assumption that the timing logic in most cases is likely
198    /// to take at least a nanosecond, and so this is reasonable mechanism to avoid
199    /// ambiguity, especially on systems with low-resolution monotonic clocks
200    pub fn add_duration(&self, duration: Duration) {
201        let more_nanos = duration.as_nanos() as usize;
202        self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
203    }
204
205    /// Add the number of nanoseconds of other `Time` to self
206    pub fn add(&self, other: &Time) {
207        self.add_duration(Duration::from_nanos(other.value() as u64))
208    }
209
210    /// return a scoped guard that adds the amount of time elapsed
211    /// between its creation and its drop or call to `stop` to the
212    /// underlying metric.
213    pub fn timer(&self) -> ScopedTimerGuard<'_> {
214        ScopedTimerGuard {
215            inner: self,
216            start: Some(Instant::now()),
217        }
218    }
219
220    /// Get the number of nanoseconds record by this Time metric
221    pub fn value(&self) -> usize {
222        self.nanos.load(Ordering::Relaxed)
223    }
224}
225
226/// Stores a single timestamp, stored as the number of nanoseconds
227/// elapsed from Jan 1, 1970 UTC
228#[derive(Debug, Clone)]
229pub struct Timestamp {
230    /// Time thing started
231    timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
232}
233
234impl Default for Timestamp {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240impl Timestamp {
241    /// Create a new timestamp and sets its value to 0
242    pub fn new() -> Self {
243        Self {
244            timestamp: Arc::new(Mutex::new(None)),
245        }
246    }
247
248    /// Sets the timestamps value to the current time
249    pub fn record(&self) {
250        self.set(Utc::now())
251    }
252
253    /// Sets the timestamps value to a specified time
254    pub fn set(&self, now: DateTime<Utc>) {
255        *self.timestamp.lock() = Some(now);
256    }
257
258    /// return the timestamps value at the last time `record()` was
259    /// called.
260    ///
261    /// Returns `None` if `record()` has not been called
262    pub fn value(&self) -> Option<DateTime<Utc>> {
263        *self.timestamp.lock()
264    }
265
266    /// sets the value of this timestamp to the minimum of this and other
267    pub fn update_to_min(&self, other: &Timestamp) {
268        let min = match (self.value(), other.value()) {
269            (None, None) => None,
270            (Some(v), None) => Some(v),
271            (None, Some(v)) => Some(v),
272            (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
273        };
274
275        *self.timestamp.lock() = min;
276    }
277
278    /// sets the value of this timestamp to the maximum of this and other
279    pub fn update_to_max(&self, other: &Timestamp) {
280        let max = match (self.value(), other.value()) {
281            (None, None) => None,
282            (Some(v), None) => Some(v),
283            (None, Some(v)) => Some(v),
284            (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
285        };
286
287        *self.timestamp.lock() = max;
288    }
289}
290
291impl PartialEq for Timestamp {
292    fn eq(&self, other: &Self) -> bool {
293        self.value().eq(&other.value())
294    }
295}
296
297impl Display for Timestamp {
298    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
299        match self.value() {
300            None => write!(f, "NONE"),
301            Some(v) => {
302                write!(f, "{v}")
303            }
304        }
305    }
306}
307
308/// RAAI structure that adds all time between its construction and
309/// destruction to the CPU time or the first call to `stop` whichever
310/// comes first
311pub struct ScopedTimerGuard<'a> {
312    inner: &'a Time,
313    start: Option<Instant>,
314}
315
316impl ScopedTimerGuard<'_> {
317    /// Stop the timer timing and record the time taken
318    pub fn stop(&mut self) {
319        if let Some(start) = self.start.take() {
320            self.inner.add_elapsed(start)
321        }
322    }
323
324    /// Restarts the timer recording from the current time
325    pub fn restart(&mut self) {
326        self.start = Some(Instant::now())
327    }
328
329    /// Stop the timer, record the time taken and consume self
330    pub fn done(mut self) {
331        self.stop()
332    }
333}
334
335impl Drop for ScopedTimerGuard<'_> {
336    fn drop(&mut self) {
337        self.stop()
338    }
339}
340
341/// Possible values for a [super::Metric].
342///
343/// Among other differences, the metric types have different ways to
344/// logically interpret their underlying values and some metrics are
345/// so common they are given special treatment.
346#[derive(Debug, Clone, PartialEq)]
347pub enum MetricValue {
348    /// Number of output rows produced: "output_rows" metric
349    OutputRows(Count),
350    /// Elapsed Compute Time: the wall clock time spent in "cpu
351    /// intensive" work.
352    ///
353    /// This measurement represents, roughly:
354    /// ```
355    /// use std::time::Instant;
356    /// let start = Instant::now();
357    /// // ...CPU intensive work here...
358    /// let elapsed_compute = (Instant::now() - start).as_nanos();
359    /// ```
360    ///
361    /// Note 1: Does *not* include time other operators spend
362    /// computing input.
363    ///
364    /// Note 2: *Does* includes time when the thread could have made
365    /// progress but the OS did not schedule it (e.g. due to CPU
366    /// contention), thus making this value different than the
367    /// classical definition of "cpu_time", which is the time reported
368    /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
369    ElapsedCompute(Time),
370    /// Number of spills produced: "spill_count" metric
371    SpillCount(Count),
372    /// Total size of spilled bytes produced: "spilled_bytes" metric
373    SpilledBytes(Count),
374    /// Total size of spilled rows produced: "spilled_rows" metric
375    SpilledRows(Count),
376    /// Current memory used
377    CurrentMemoryUsage(Gauge),
378    /// Operator defined count.
379    Count {
380        /// The provided name of this metric
381        name: Cow<'static, str>,
382        /// The value of the metric
383        count: Count,
384    },
385    /// Operator defined gauge.
386    Gauge {
387        /// The provided name of this metric
388        name: Cow<'static, str>,
389        /// The value of the metric
390        gauge: Gauge,
391    },
392    /// Operator defined time
393    Time {
394        /// The provided name of this metric
395        name: Cow<'static, str>,
396        /// The value of the metric
397        time: Time,
398    },
399    /// The time at which execution started
400    StartTimestamp(Timestamp),
401    /// The time at which execution ended
402    EndTimestamp(Timestamp),
403}
404
405impl MetricValue {
406    /// Return the name of this SQL metric
407    pub fn name(&self) -> &str {
408        match self {
409            Self::OutputRows(_) => "output_rows",
410            Self::SpillCount(_) => "spill_count",
411            Self::SpilledBytes(_) => "spilled_bytes",
412            Self::SpilledRows(_) => "spilled_rows",
413            Self::CurrentMemoryUsage(_) => "mem_used",
414            Self::ElapsedCompute(_) => "elapsed_compute",
415            Self::Count { name, .. } => name.borrow(),
416            Self::Gauge { name, .. } => name.borrow(),
417            Self::Time { name, .. } => name.borrow(),
418            Self::StartTimestamp(_) => "start_timestamp",
419            Self::EndTimestamp(_) => "end_timestamp",
420        }
421    }
422
423    /// Return the value of the metric as a usize value
424    pub fn as_usize(&self) -> usize {
425        match self {
426            Self::OutputRows(count) => count.value(),
427            Self::SpillCount(count) => count.value(),
428            Self::SpilledBytes(bytes) => bytes.value(),
429            Self::SpilledRows(count) => count.value(),
430            Self::CurrentMemoryUsage(used) => used.value(),
431            Self::ElapsedCompute(time) => time.value(),
432            Self::Count { count, .. } => count.value(),
433            Self::Gauge { gauge, .. } => gauge.value(),
434            Self::Time { time, .. } => time.value(),
435            Self::StartTimestamp(timestamp) => timestamp
436                .value()
437                .and_then(|ts| ts.timestamp_nanos_opt())
438                .map(|nanos| nanos as usize)
439                .unwrap_or(0),
440            Self::EndTimestamp(timestamp) => timestamp
441                .value()
442                .and_then(|ts| ts.timestamp_nanos_opt())
443                .map(|nanos| nanos as usize)
444                .unwrap_or(0),
445        }
446    }
447
448    /// create a new MetricValue with the same type as `self` suitable
449    /// for accumulating
450    pub fn new_empty(&self) -> Self {
451        match self {
452            Self::OutputRows(_) => Self::OutputRows(Count::new()),
453            Self::SpillCount(_) => Self::SpillCount(Count::new()),
454            Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
455            Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
456            Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
457            Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
458            Self::Count { name, .. } => Self::Count {
459                name: name.clone(),
460                count: Count::new(),
461            },
462            Self::Gauge { name, .. } => Self::Gauge {
463                name: name.clone(),
464                gauge: Gauge::new(),
465            },
466            Self::Time { name, .. } => Self::Time {
467                name: name.clone(),
468                time: Time::new(),
469            },
470            Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
471            Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
472        }
473    }
474
475    /// Aggregates the value of other to `self`. panic's if the types
476    /// are mismatched or aggregating does not make sense for this
477    /// value
478    ///
479    /// Note this is purposely marked `mut` (even though atomics are
480    /// used) so Rust's type system can be used to ensure the
481    /// appropriate API access. `MetricValues` should be modified
482    /// using the original [`Count`] or [`Time`] they were created
483    /// from.
484    pub fn aggregate(&mut self, other: &Self) {
485        match (self, other) {
486            (Self::OutputRows(count), Self::OutputRows(other_count))
487            | (Self::SpillCount(count), Self::SpillCount(other_count))
488            | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
489            | (Self::SpilledRows(count), Self::SpilledRows(other_count))
490            | (
491                Self::Count { count, .. },
492                Self::Count {
493                    count: other_count, ..
494                },
495            ) => count.add(other_count.value()),
496            (Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
497            | (
498                Self::Gauge { gauge, .. },
499                Self::Gauge {
500                    gauge: other_gauge, ..
501                },
502            ) => gauge.add(other_gauge.value()),
503            (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
504            | (
505                Self::Time { time, .. },
506                Self::Time {
507                    time: other_time, ..
508                },
509            ) => time.add(other_time),
510            // timestamps are aggregated by min/max
511            (Self::StartTimestamp(timestamp), Self::StartTimestamp(other_timestamp)) => {
512                timestamp.update_to_min(other_timestamp);
513            }
514            // timestamps are aggregated by min/max
515            (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
516                timestamp.update_to_max(other_timestamp);
517            }
518            m @ (_, _) => {
519                panic!(
520                    "Mismatched metric types. Can not aggregate {:?} with value {:?}",
521                    m.0, m.1
522                )
523            }
524        }
525    }
526
527    /// Returns a number by which to sort metrics by display. Lower
528    /// numbers are "more useful" (and displayed first)
529    pub fn display_sort_key(&self) -> u8 {
530        match self {
531            Self::OutputRows(_) => 0,     // show first
532            Self::ElapsedCompute(_) => 1, // show second
533            Self::SpillCount(_) => 2,
534            Self::SpilledBytes(_) => 3,
535            Self::SpilledRows(_) => 4,
536            Self::CurrentMemoryUsage(_) => 5,
537            Self::Count { .. } => 6,
538            Self::Gauge { .. } => 7,
539            Self::Time { .. } => 8,
540            Self::StartTimestamp(_) => 9, // show timestamps last
541            Self::EndTimestamp(_) => 10,
542        }
543    }
544
545    /// returns true if this metric has a timestamp value
546    pub fn is_timestamp(&self) -> bool {
547        matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
548    }
549}
550
551impl Display for MetricValue {
552    /// Prints the value of this metric
553    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
554        match self {
555            Self::OutputRows(count)
556            | Self::SpillCount(count)
557            | Self::SpilledBytes(count)
558            | Self::SpilledRows(count)
559            | Self::Count { count, .. } => {
560                write!(f, "{count}")
561            }
562            Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
563                write!(f, "{gauge}")
564            }
565            Self::ElapsedCompute(time) | Self::Time { time, .. } => {
566                // distinguish between no time recorded and very small
567                // amount of time recorded
568                if time.value() > 0 {
569                    write!(f, "{time}")
570                } else {
571                    write!(f, "NOT RECORDED")
572                }
573            }
574            Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
575                write!(f, "{timestamp}")
576            }
577        }
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use chrono::TimeZone;
584
585    use super::*;
586
587    #[test]
588    fn test_display_output_rows() {
589        let count = Count::new();
590        let values = vec![
591            MetricValue::OutputRows(count.clone()),
592            MetricValue::Count {
593                name: "my_counter".into(),
594                count: count.clone(),
595            },
596        ];
597
598        for value in &values {
599            assert_eq!("0", value.to_string(), "value {value:?}");
600        }
601
602        count.add(42);
603        for value in &values {
604            assert_eq!("42", value.to_string(), "value {value:?}");
605        }
606    }
607
608    #[test]
609    fn test_display_time() {
610        let time = Time::new();
611        let values = vec![
612            MetricValue::ElapsedCompute(time.clone()),
613            MetricValue::Time {
614                name: "my_time".into(),
615                time: time.clone(),
616            },
617        ];
618
619        // if time is not set, it should not be reported as zero
620        for value in &values {
621            assert_eq!("NOT RECORDED", value.to_string(), "value {value:?}");
622        }
623
624        time.add_duration(Duration::from_nanos(1042));
625        for value in &values {
626            assert_eq!("1.042µs", value.to_string(), "value {value:?}");
627        }
628    }
629
630    #[test]
631    fn test_display_timestamp() {
632        let timestamp = Timestamp::new();
633        let values = vec![
634            MetricValue::StartTimestamp(timestamp.clone()),
635            MetricValue::EndTimestamp(timestamp.clone()),
636        ];
637
638        // if time is not set, it should not be reported as zero
639        for value in &values {
640            assert_eq!("NONE", value.to_string(), "value {value:?}");
641        }
642
643        timestamp.set(Utc.timestamp_nanos(1431648000000000));
644        for value in &values {
645            assert_eq!(
646                "1970-01-17 13:40:48 UTC",
647                value.to_string(),
648                "value {value:?}"
649            );
650        }
651    }
652}