datafusion_physical_plan/metrics/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics for recording information about execution
19
20mod baseline;
21mod builder;
22mod value;
23
24use parking_lot::Mutex;
25use std::{
26    borrow::Cow,
27    fmt::{Debug, Display},
28    sync::Arc,
29};
30
31use datafusion_common::HashMap;
32
33// public exports
34pub use baseline::{BaselineMetrics, RecordOutput};
35pub use builder::MetricBuilder;
36pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
37
38/// Something that tracks a value of interest (metric) of a DataFusion
39/// [`ExecutionPlan`] execution.
40///
41/// Typically [`Metric`]s are not created directly, but instead
42/// are created using [`MetricBuilder`] or methods on
43/// [`ExecutionPlanMetricsSet`].
44///
45/// ```
46///  use datafusion_physical_plan::metrics::*;
47///
48///  let metrics = ExecutionPlanMetricsSet::new();
49///  assert!(metrics.clone_inner().output_rows().is_none());
50///
51///  // Create a counter to increment using the MetricBuilder
52///  let partition = 1;
53///  let output_rows = MetricBuilder::new(&metrics)
54///      .output_rows(partition);
55///
56///  // Counter can be incremented
57///  output_rows.add(13);
58///
59///  // The value can be retrieved directly:
60///  assert_eq!(output_rows.value(), 13);
61///
62///  // As well as from the metrics set
63///  assert_eq!(metrics.clone_inner().output_rows(), Some(13));
64/// ```
65///
66/// [`ExecutionPlan`]: super::ExecutionPlan
67
68#[derive(Debug)]
69pub struct Metric {
70    /// The value of the metric
71    value: MetricValue,
72
73    /// arbitrary name=value pairs identifying this metric
74    labels: Vec<Label>,
75
76    /// To which partition of an operators output did this metric
77    /// apply? If `None` then means all partitions.
78    partition: Option<usize>,
79}
80
81impl Display for Metric {
82    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83        write!(f, "{}", self.value.name())?;
84
85        let mut iter = self
86            .partition
87            .iter()
88            .map(|partition| Label::new("partition", partition.to_string()))
89            .chain(self.labels().iter().cloned())
90            .peekable();
91
92        // print out the labels specially
93        if iter.peek().is_some() {
94            write!(f, "{{")?;
95
96            let mut is_first = true;
97            for i in iter {
98                if !is_first {
99                    write!(f, ", ")?;
100                } else {
101                    is_first = false;
102                }
103
104                write!(f, "{i}")?;
105            }
106
107            write!(f, "}}")?;
108        }
109
110        // and now the value
111        write!(f, "={}", self.value)
112    }
113}
114
115impl Metric {
116    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
117    /// rather than this function directly.
118    pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
119        Self {
120            value,
121            labels: vec![],
122            partition,
123        }
124    }
125
126    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
127    /// rather than this function directly.
128    pub fn new_with_labels(
129        value: MetricValue,
130        partition: Option<usize>,
131        labels: Vec<Label>,
132    ) -> Self {
133        Self {
134            value,
135            labels,
136            partition,
137        }
138    }
139
140    /// Add a new label to this metric
141    pub fn with_label(mut self, label: Label) -> Self {
142        self.labels.push(label);
143        self
144    }
145
146    /// What labels are present for this metric?
147    pub fn labels(&self) -> &[Label] {
148        &self.labels
149    }
150
151    /// Return a reference to the value of this metric
152    pub fn value(&self) -> &MetricValue {
153        &self.value
154    }
155
156    /// Return a mutable reference to the value of this metric
157    pub fn value_mut(&mut self) -> &mut MetricValue {
158        &mut self.value
159    }
160
161    /// Return a reference to the partition
162    pub fn partition(&self) -> Option<usize> {
163        self.partition
164    }
165}
166
167/// A snapshot of the metrics for a particular ([`ExecutionPlan`]).
168///
169/// [`ExecutionPlan`]: super::ExecutionPlan
170#[derive(Default, Debug, Clone)]
171pub struct MetricsSet {
172    metrics: Vec<Arc<Metric>>,
173}
174
175impl MetricsSet {
176    /// Create a new container of metrics
177    pub fn new() -> Self {
178        Default::default()
179    }
180
181    /// Add the specified metric
182    pub fn push(&mut self, metric: Arc<Metric>) {
183        self.metrics.push(metric)
184    }
185
186    /// Returns an iterator across all metrics
187    pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
188        self.metrics.iter()
189    }
190
191    /// Convenience: return the number of rows produced, aggregated
192    /// across partitions or `None` if no metric is present
193    pub fn output_rows(&self) -> Option<usize> {
194        self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
195            .map(|v| v.as_usize())
196    }
197
198    /// Convenience: return the count of spills, aggregated
199    /// across partitions or `None` if no metric is present
200    pub fn spill_count(&self) -> Option<usize> {
201        self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
202            .map(|v| v.as_usize())
203    }
204
205    /// Convenience: return the total byte size of spills, aggregated
206    /// across partitions or `None` if no metric is present
207    pub fn spilled_bytes(&self) -> Option<usize> {
208        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
209            .map(|v| v.as_usize())
210    }
211
212    /// Convenience: return the total rows of spills, aggregated
213    /// across partitions or `None` if no metric is present
214    pub fn spilled_rows(&self) -> Option<usize> {
215        self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
216            .map(|v| v.as_usize())
217    }
218
219    /// Convenience: return the amount of elapsed CPU time spent,
220    /// aggregated across partitions or `None` if no metric is present
221    pub fn elapsed_compute(&self) -> Option<usize> {
222        self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
223            .map(|v| v.as_usize())
224    }
225
226    /// Sums the values for metrics for which `f(metric)` returns
227    /// `true`, and returns the value. Returns `None` if no metrics match
228    /// the predicate.
229    pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
230    where
231        F: FnMut(&Metric) -> bool,
232    {
233        let mut iter = self
234            .metrics
235            .iter()
236            .filter(|metric| f(metric.as_ref()))
237            .peekable();
238
239        let mut accum = match iter.peek() {
240            None => {
241                return None;
242            }
243            Some(metric) => metric.value().new_empty(),
244        };
245
246        iter.for_each(|metric| accum.aggregate(metric.value()));
247
248        Some(accum)
249    }
250
251    /// Returns the sum of all the metrics with the specified name
252    /// in the returned set.
253    pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
254        self.sum(|m| match m.value() {
255            MetricValue::Count { name, .. } => name == metric_name,
256            MetricValue::Time { name, .. } => name == metric_name,
257            MetricValue::OutputRows(_) => false,
258            MetricValue::ElapsedCompute(_) => false,
259            MetricValue::SpillCount(_) => false,
260            MetricValue::SpilledBytes(_) => false,
261            MetricValue::SpilledRows(_) => false,
262            MetricValue::CurrentMemoryUsage(_) => false,
263            MetricValue::Gauge { name, .. } => name == metric_name,
264            MetricValue::StartTimestamp(_) => false,
265            MetricValue::EndTimestamp(_) => false,
266        })
267    }
268
269    /// Returns a new derived `MetricsSet` where all metrics
270    /// that had the same name have been
271    /// aggregated together. The resulting `MetricsSet` has all
272    /// metrics with `Partition=None`
273    pub fn aggregate_by_name(&self) -> Self {
274        let mut map = HashMap::new();
275
276        // There are all sorts of ways to make this more efficient
277        for metric in &self.metrics {
278            let key = metric.value.name();
279            map.entry(key)
280                .and_modify(|accum: &mut Metric| {
281                    accum.value_mut().aggregate(metric.value());
282                })
283                .or_insert_with(|| {
284                    // accumulate with no partition
285                    let partition = None;
286                    let mut accum = Metric::new(metric.value().new_empty(), partition);
287                    accum.value_mut().aggregate(metric.value());
288                    accum
289                });
290        }
291
292        let new_metrics = map
293            .into_iter()
294            .map(|(_k, v)| Arc::new(v))
295            .collect::<Vec<_>>();
296
297        Self {
298            metrics: new_metrics,
299        }
300    }
301
302    /// Sort the order of metrics so the "most useful" show up first
303    pub fn sorted_for_display(mut self) -> Self {
304        self.metrics.sort_unstable_by_key(|metric| {
305            (
306                metric.value().display_sort_key(),
307                metric.value().name().to_owned(),
308            )
309        });
310        self
311    }
312
313    /// Remove all timestamp metrics (for more compact display)
314    pub fn timestamps_removed(self) -> Self {
315        let Self { metrics } = self;
316
317        let metrics = metrics
318            .into_iter()
319            .filter(|m| !m.value.is_timestamp())
320            .collect::<Vec<_>>();
321
322        Self { metrics }
323    }
324}
325
326impl Display for MetricsSet {
327    /// Format the [`MetricsSet`] as a single string
328    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
329        let mut is_first = true;
330        for i in self.metrics.iter() {
331            if !is_first {
332                write!(f, ", ")?;
333            } else {
334                is_first = false;
335            }
336
337            write!(f, "{i}")?;
338        }
339        Ok(())
340    }
341}
342
343/// A set of [`Metric`]s for an individual "operator" (e.g. `&dyn
344/// ExecutionPlan`).
345///
346/// This structure is intended as a convenience for [`ExecutionPlan`]
347/// implementations so they can generate different streams for multiple
348/// partitions but easily report them together.
349///
350/// Each `clone()` of this structure will add metrics to the same
351/// underlying metrics set
352///
353/// [`ExecutionPlan`]: super::ExecutionPlan
354#[derive(Default, Debug, Clone)]
355pub struct ExecutionPlanMetricsSet {
356    inner: Arc<Mutex<MetricsSet>>,
357}
358
359impl ExecutionPlanMetricsSet {
360    /// Create a new empty shared metrics set
361    pub fn new() -> Self {
362        Self {
363            inner: Arc::new(Mutex::new(MetricsSet::new())),
364        }
365    }
366
367    /// Add the specified metric to the underlying metric set
368    pub fn register(&self, metric: Arc<Metric>) {
369        self.inner.lock().push(metric)
370    }
371
372    /// Return a clone of the inner [`MetricsSet`]
373    pub fn clone_inner(&self) -> MetricsSet {
374        let guard = self.inner.lock();
375        (*guard).clone()
376    }
377}
378
379/// `name=value` pairs identifying a metric. This concept is called various things
380/// in various different systems:
381///
382/// "labels" in
383/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
384/// "tags" in
385/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
386/// , "attributes" in [open
387/// telemetry]<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md>,
388/// etc.
389///
390/// As the name and value are expected to mostly be constant strings,
391/// use a [`Cow`] to avoid copying / allocations in this common case.
392#[derive(Debug, Clone, PartialEq, Eq, Hash)]
393pub struct Label {
394    name: Cow<'static, str>,
395    value: Cow<'static, str>,
396}
397
398impl Label {
399    /// Create a new [`Label`]
400    pub fn new(
401        name: impl Into<Cow<'static, str>>,
402        value: impl Into<Cow<'static, str>>,
403    ) -> Self {
404        let name = name.into();
405        let value = value.into();
406        Self { name, value }
407    }
408
409    /// Returns the name of this label
410    pub fn name(&self) -> &str {
411        self.name.as_ref()
412    }
413
414    /// Returns the value of this label
415    pub fn value(&self) -> &str {
416        self.value.as_ref()
417    }
418}
419
420impl Display for Label {
421    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
422        write!(f, "{}={}", self.name, self.value)
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use std::time::Duration;
429
430    use chrono::{TimeZone, Utc};
431
432    use super::*;
433
434    #[test]
435    fn test_display_no_labels_no_partition() {
436        let count = Count::new();
437        count.add(33);
438        let value = MetricValue::OutputRows(count);
439        let partition = None;
440        let metric = Metric::new(value, partition);
441
442        assert_eq!("output_rows=33", metric.to_string())
443    }
444
445    #[test]
446    fn test_display_no_labels_with_partition() {
447        let count = Count::new();
448        count.add(44);
449        let value = MetricValue::OutputRows(count);
450        let partition = Some(1);
451        let metric = Metric::new(value, partition);
452
453        assert_eq!("output_rows{partition=1}=44", metric.to_string())
454    }
455
456    #[test]
457    fn test_display_labels_no_partition() {
458        let count = Count::new();
459        count.add(55);
460        let value = MetricValue::OutputRows(count);
461        let partition = None;
462        let label = Label::new("foo", "bar");
463        let metric = Metric::new_with_labels(value, partition, vec![label]);
464
465        assert_eq!("output_rows{foo=bar}=55", metric.to_string())
466    }
467
468    #[test]
469    fn test_display_labels_and_partition() {
470        let count = Count::new();
471        count.add(66);
472        let value = MetricValue::OutputRows(count);
473        let partition = Some(2);
474        let label = Label::new("foo", "bar");
475        let metric = Metric::new_with_labels(value, partition, vec![label]);
476
477        assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
478    }
479
480    #[test]
481    fn test_output_rows() {
482        let metrics = ExecutionPlanMetricsSet::new();
483        assert!(metrics.clone_inner().output_rows().is_none());
484
485        let partition = 1;
486        let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
487        output_rows.add(13);
488
489        let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
490        output_rows.add(7);
491        assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
492    }
493
494    #[test]
495    fn test_elapsed_compute() {
496        let metrics = ExecutionPlanMetricsSet::new();
497        assert!(metrics.clone_inner().elapsed_compute().is_none());
498
499        let partition = 1;
500        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
501        elapsed_compute.add_duration(Duration::from_nanos(1234));
502
503        let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
504        elapsed_compute.add_duration(Duration::from_nanos(6));
505        assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
506    }
507
508    #[test]
509    fn test_sum() {
510        let metrics = ExecutionPlanMetricsSet::new();
511
512        let count1 = MetricBuilder::new(&metrics)
513            .with_new_label("foo", "bar")
514            .counter("my_counter", 1);
515        count1.add(1);
516
517        let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
518        count2.add(2);
519
520        let metrics = metrics.clone_inner();
521        assert!(metrics.sum(|_| false).is_none());
522
523        let expected_count = Count::new();
524        expected_count.add(3);
525        let expected_sum = MetricValue::Count {
526            name: "my_counter".into(),
527            count: expected_count,
528        };
529
530        assert_eq!(metrics.sum(|_| true), Some(expected_sum));
531    }
532
533    #[test]
534    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
535    fn test_bad_sum() {
536        // can not add different kinds of metrics
537        let metrics = ExecutionPlanMetricsSet::new();
538
539        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
540        count.add(1);
541
542        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
543        time.add_duration(Duration::from_nanos(10));
544
545        // expect that this will error out
546        metrics.clone_inner().sum(|_| true);
547    }
548
549    #[test]
550    fn test_aggregate_by_name() {
551        let metrics = ExecutionPlanMetricsSet::new();
552
553        // Note cpu_time1 has labels but it is still aggregated with metrics 2 and 3
554        let elapsed_compute1 = MetricBuilder::new(&metrics)
555            .with_new_label("foo", "bar")
556            .elapsed_compute(1);
557        elapsed_compute1.add_duration(Duration::from_nanos(12));
558
559        let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
560        elapsed_compute2.add_duration(Duration::from_nanos(34));
561
562        let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
563        elapsed_compute3.add_duration(Duration::from_nanos(56));
564
565        let output_rows = MetricBuilder::new(&metrics).output_rows(1); // output rows
566        output_rows.add(56);
567
568        let aggregated = metrics.clone_inner().aggregate_by_name();
569
570        // cpu time should be aggregated:
571        let elapsed_computes = aggregated
572            .iter()
573            .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
574            .collect::<Vec<_>>();
575        assert_eq!(elapsed_computes.len(), 1);
576        assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
577        assert!(elapsed_computes[0].partition().is_none());
578
579        // output rows should
580        let output_rows = aggregated
581            .iter()
582            .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
583            .collect::<Vec<_>>();
584        assert_eq!(output_rows.len(), 1);
585        assert_eq!(output_rows[0].value().as_usize(), 56);
586        assert!(output_rows[0].partition.is_none())
587    }
588
589    #[test]
590    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
591    fn test_aggregate_partition_bad_sum() {
592        let metrics = ExecutionPlanMetricsSet::new();
593
594        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
595        count.add(1);
596
597        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
598        time.add_duration(Duration::from_nanos(10));
599
600        // can't aggregate time and count -- expect a panic
601        metrics.clone_inner().aggregate_by_name();
602    }
603
604    #[test]
605    fn test_aggregate_partition_timestamps() {
606        let metrics = ExecutionPlanMetricsSet::new();
607
608        // 1431648000000000 == 1970-01-17 13:40:48 UTC
609        let t1 = Utc.timestamp_nanos(1431648000000000);
610        // 1531648000000000 == 1970-01-18 17:27:28 UTC
611        let t2 = Utc.timestamp_nanos(1531648000000000);
612        // 1631648000000000 == 1970-01-19 21:14:08 UTC
613        let t3 = Utc.timestamp_nanos(1631648000000000);
614        // 1731648000000000 == 1970-01-21 01:00:48 UTC
615        let t4 = Utc.timestamp_nanos(1731648000000000);
616
617        let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
618        start_timestamp0.set(t1);
619        let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
620        end_timestamp0.set(t2);
621        let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
622        start_timestamp1.set(t3);
623        let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
624        end_timestamp1.set(t4);
625
626        // aggregate
627        let aggregated = metrics.clone_inner().aggregate_by_name();
628
629        let mut ts = aggregated
630            .iter()
631            .filter(|metric| {
632                matches!(metric.value(), MetricValue::StartTimestamp(_))
633                    && metric.labels().is_empty()
634            })
635            .collect::<Vec<_>>();
636        assert_eq!(ts.len(), 1);
637        match ts.remove(0).value() {
638            MetricValue::StartTimestamp(ts) => {
639                // expect earliest of t1, t2
640                assert_eq!(ts.value(), Some(t1));
641            }
642            _ => {
643                panic!("Not a timestamp");
644            }
645        };
646
647        let mut ts = aggregated
648            .iter()
649            .filter(|metric| {
650                matches!(metric.value(), MetricValue::EndTimestamp(_))
651                    && metric.labels().is_empty()
652            })
653            .collect::<Vec<_>>();
654        assert_eq!(ts.len(), 1);
655        match ts.remove(0).value() {
656            MetricValue::EndTimestamp(ts) => {
657                // expect latest of t3, t4
658                assert_eq!(ts.value(), Some(t4));
659            }
660            _ => {
661                panic!("Not a timestamp");
662            }
663        };
664    }
665
666    #[test]
667    fn test_sorted_for_display() {
668        let metrics = ExecutionPlanMetricsSet::new();
669        MetricBuilder::new(&metrics).end_timestamp(0);
670        MetricBuilder::new(&metrics).start_timestamp(0);
671        MetricBuilder::new(&metrics).elapsed_compute(0);
672        MetricBuilder::new(&metrics).counter("the_second_counter", 0);
673        MetricBuilder::new(&metrics).counter("the_counter", 0);
674        MetricBuilder::new(&metrics).counter("the_third_counter", 0);
675        MetricBuilder::new(&metrics).subset_time("the_time", 0);
676        MetricBuilder::new(&metrics).output_rows(0);
677        let metrics = metrics.clone_inner();
678
679        fn metric_names(metrics: &MetricsSet) -> String {
680            let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
681            n.join(", ")
682        }
683
684        assert_eq!("end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows", metric_names(&metrics));
685
686        let metrics = metrics.sorted_for_display();
687        assert_eq!("output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp", metric_names(&metrics));
688    }
689}