datafusion_physical_plan/metrics/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Builder for creating arbitrary metrics
19
20use std::{borrow::Cow, sync::Arc};
21
22use super::{
23    Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
24};
25
26/// Structure for constructing metrics, counters, timers, etc.
27///
28/// Note the use of `Cow<..>` is to avoid allocations in the common
29/// case of constant strings
30///
31/// ```rust
32///  use datafusion_physical_plan::metrics::*;
33///
34///  let metrics = ExecutionPlanMetricsSet::new();
35///  let partition = 1;
36///
37///  // Create the standard output_rows metric
38///  let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
39///
40///  // Create a operator specific counter with some labels
41///  let num_bytes = MetricBuilder::new(&metrics)
42///    .with_new_label("filename", "my_awesome_file.parquet")
43///    .counter("num_bytes", partition);
44///
45/// ```
46pub struct MetricBuilder<'a> {
47    /// Location that the metric created by this builder will be added do
48    metrics: &'a ExecutionPlanMetricsSet,
49
50    /// optional partition number
51    partition: Option<usize>,
52
53    /// arbitrary name=value pairs identifying this metric
54    labels: Vec<Label>,
55}
56
57impl<'a> MetricBuilder<'a> {
58    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
59    pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
60        Self {
61            metrics,
62            partition: None,
63            labels: vec![],
64        }
65    }
66
67    /// Add a label to the metric being constructed
68    pub fn with_label(mut self, label: Label) -> Self {
69        self.labels.push(label);
70        self
71    }
72
73    /// Add a label to the metric being constructed
74    pub fn with_new_label(
75        self,
76        name: impl Into<Cow<'static, str>>,
77        value: impl Into<Cow<'static, str>>,
78    ) -> Self {
79        self.with_label(Label::new(name.into(), value.into()))
80    }
81
82    /// Set the partition of the metric being constructed
83    pub fn with_partition(mut self, partition: usize) -> Self {
84        self.partition = Some(partition);
85        self
86    }
87
88    /// Consume self and create a metric of the specified value
89    /// registered with the MetricsSet
90    pub fn build(self, value: MetricValue) {
91        let Self {
92            labels,
93            partition,
94            metrics,
95        } = self;
96        let metric = Arc::new(Metric::new_with_labels(value, partition, labels));
97        metrics.register(metric);
98    }
99
100    /// Consume self and create a new counter for recording output rows
101    pub fn output_rows(self, partition: usize) -> Count {
102        let count = Count::new();
103        self.with_partition(partition)
104            .build(MetricValue::OutputRows(count.clone()));
105        count
106    }
107
108    /// Consume self and create a new counter for recording the number of spills
109    /// triggered by an operator
110    pub fn spill_count(self, partition: usize) -> Count {
111        let count = Count::new();
112        self.with_partition(partition)
113            .build(MetricValue::SpillCount(count.clone()));
114        count
115    }
116
117    /// Consume self and create a new counter for recording the total spilled bytes
118    /// triggered by an operator
119    pub fn spilled_bytes(self, partition: usize) -> Count {
120        let count = Count::new();
121        self.with_partition(partition)
122            .build(MetricValue::SpilledBytes(count.clone()));
123        count
124    }
125
126    /// Consume self and create a new counter for recording the total spilled rows
127    /// triggered by an operator
128    pub fn spilled_rows(self, partition: usize) -> Count {
129        let count = Count::new();
130        self.with_partition(partition)
131            .build(MetricValue::SpilledRows(count.clone()));
132        count
133    }
134
135    /// Consume self and create a new gauge for reporting current memory usage
136    pub fn mem_used(self, partition: usize) -> Gauge {
137        let gauge = Gauge::new();
138        self.with_partition(partition)
139            .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
140        gauge
141    }
142
143    /// Consumes self and creates a new [`Count`] for recording some
144    /// arbitrary metric of an operator.
145    pub fn counter(
146        self,
147        counter_name: impl Into<Cow<'static, str>>,
148        partition: usize,
149    ) -> Count {
150        self.with_partition(partition).global_counter(counter_name)
151    }
152
153    /// Consumes self and creates a new [`Gauge`] for reporting some
154    /// arbitrary metric of an operator.
155    pub fn gauge(
156        self,
157        gauge_name: impl Into<Cow<'static, str>>,
158        partition: usize,
159    ) -> Gauge {
160        self.with_partition(partition).global_gauge(gauge_name)
161    }
162
163    /// Consumes self and creates a new [`Count`] for recording a
164    /// metric of an overall operator (not per partition)
165    pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
166        let count = Count::new();
167        self.build(MetricValue::Count {
168            name: counter_name.into(),
169            count: count.clone(),
170        });
171        count
172    }
173
174    /// Consumes self and creates a new [`Gauge`] for reporting a
175    /// metric of an overall operator (not per partition)
176    pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
177        let gauge = Gauge::new();
178        self.build(MetricValue::Gauge {
179            name: gauge_name.into(),
180            gauge: gauge.clone(),
181        });
182        gauge
183    }
184
185    /// Consume self and create a new Timer for recording the elapsed
186    /// CPU time spent by an operator
187    pub fn elapsed_compute(self, partition: usize) -> Time {
188        let time = Time::new();
189        self.with_partition(partition)
190            .build(MetricValue::ElapsedCompute(time.clone()));
191        time
192    }
193
194    /// Consumes self and creates a new Timer for recording some
195    /// subset of an operators execution time.
196    pub fn subset_time(
197        self,
198        subset_name: impl Into<Cow<'static, str>>,
199        partition: usize,
200    ) -> Time {
201        let time = Time::new();
202        self.with_partition(partition).build(MetricValue::Time {
203            name: subset_name.into(),
204            time: time.clone(),
205        });
206        time
207    }
208
209    /// Consumes self and creates a new Timestamp for recording the
210    /// starting time of execution for a partition
211    pub fn start_timestamp(self, partition: usize) -> Timestamp {
212        let timestamp = Timestamp::new();
213        self.with_partition(partition)
214            .build(MetricValue::StartTimestamp(timestamp.clone()));
215        timestamp
216    }
217
218    /// Consumes self and creates a new Timestamp for recording the
219    /// ending time of execution for a partition
220    pub fn end_timestamp(self, partition: usize) -> Timestamp {
221        let timestamp = Timestamp::new();
222        self.with_partition(partition)
223            .build(MetricValue::EndTimestamp(timestamp.clone()));
224        timestamp
225    }
226}