datafusion_physical_plan/metrics/builder.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Builder for creating arbitrary metrics
19
20use std::{borrow::Cow, sync::Arc};
21
22use super::{
23 Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
24};
25
26/// Structure for constructing metrics, counters, timers, etc.
27///
28/// Note the use of `Cow<..>` is to avoid allocations in the common
29/// case of constant strings
30///
31/// ```rust
32/// use datafusion_physical_plan::metrics::*;
33///
34/// let metrics = ExecutionPlanMetricsSet::new();
35/// let partition = 1;
36///
37/// // Create the standard output_rows metric
38/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
39///
40/// // Create a operator specific counter with some labels
41/// let num_bytes = MetricBuilder::new(&metrics)
42/// .with_new_label("filename", "my_awesome_file.parquet")
43/// .counter("num_bytes", partition);
44///
45/// ```
46pub struct MetricBuilder<'a> {
47 /// Location that the metric created by this builder will be added do
48 metrics: &'a ExecutionPlanMetricsSet,
49
50 /// optional partition number
51 partition: Option<usize>,
52
53 /// arbitrary name=value pairs identifying this metric
54 labels: Vec<Label>,
55}
56
57impl<'a> MetricBuilder<'a> {
58 /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
59 pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
60 Self {
61 metrics,
62 partition: None,
63 labels: vec![],
64 }
65 }
66
67 /// Add a label to the metric being constructed
68 pub fn with_label(mut self, label: Label) -> Self {
69 self.labels.push(label);
70 self
71 }
72
73 /// Add a label to the metric being constructed
74 pub fn with_new_label(
75 self,
76 name: impl Into<Cow<'static, str>>,
77 value: impl Into<Cow<'static, str>>,
78 ) -> Self {
79 self.with_label(Label::new(name.into(), value.into()))
80 }
81
82 /// Set the partition of the metric being constructed
83 pub fn with_partition(mut self, partition: usize) -> Self {
84 self.partition = Some(partition);
85 self
86 }
87
88 /// Consume self and create a metric of the specified value
89 /// registered with the MetricsSet
90 pub fn build(self, value: MetricValue) {
91 let Self {
92 labels,
93 partition,
94 metrics,
95 } = self;
96 let metric = Arc::new(Metric::new_with_labels(value, partition, labels));
97 metrics.register(metric);
98 }
99
100 /// Consume self and create a new counter for recording output rows
101 pub fn output_rows(self, partition: usize) -> Count {
102 let count = Count::new();
103 self.with_partition(partition)
104 .build(MetricValue::OutputRows(count.clone()));
105 count
106 }
107
108 /// Consume self and create a new counter for recording the number of spills
109 /// triggered by an operator
110 pub fn spill_count(self, partition: usize) -> Count {
111 let count = Count::new();
112 self.with_partition(partition)
113 .build(MetricValue::SpillCount(count.clone()));
114 count
115 }
116
117 /// Consume self and create a new counter for recording the total spilled bytes
118 /// triggered by an operator
119 pub fn spilled_bytes(self, partition: usize) -> Count {
120 let count = Count::new();
121 self.with_partition(partition)
122 .build(MetricValue::SpilledBytes(count.clone()));
123 count
124 }
125
126 /// Consume self and create a new counter for recording the total spilled rows
127 /// triggered by an operator
128 pub fn spilled_rows(self, partition: usize) -> Count {
129 let count = Count::new();
130 self.with_partition(partition)
131 .build(MetricValue::SpilledRows(count.clone()));
132 count
133 }
134
135 /// Consume self and create a new gauge for reporting current memory usage
136 pub fn mem_used(self, partition: usize) -> Gauge {
137 let gauge = Gauge::new();
138 self.with_partition(partition)
139 .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
140 gauge
141 }
142
143 /// Consumes self and creates a new [`Count`] for recording some
144 /// arbitrary metric of an operator.
145 pub fn counter(
146 self,
147 counter_name: impl Into<Cow<'static, str>>,
148 partition: usize,
149 ) -> Count {
150 self.with_partition(partition).global_counter(counter_name)
151 }
152
153 /// Consumes self and creates a new [`Gauge`] for reporting some
154 /// arbitrary metric of an operator.
155 pub fn gauge(
156 self,
157 gauge_name: impl Into<Cow<'static, str>>,
158 partition: usize,
159 ) -> Gauge {
160 self.with_partition(partition).global_gauge(gauge_name)
161 }
162
163 /// Consumes self and creates a new [`Count`] for recording a
164 /// metric of an overall operator (not per partition)
165 pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
166 let count = Count::new();
167 self.build(MetricValue::Count {
168 name: counter_name.into(),
169 count: count.clone(),
170 });
171 count
172 }
173
174 /// Consumes self and creates a new [`Gauge`] for reporting a
175 /// metric of an overall operator (not per partition)
176 pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
177 let gauge = Gauge::new();
178 self.build(MetricValue::Gauge {
179 name: gauge_name.into(),
180 gauge: gauge.clone(),
181 });
182 gauge
183 }
184
185 /// Consume self and create a new Timer for recording the elapsed
186 /// CPU time spent by an operator
187 pub fn elapsed_compute(self, partition: usize) -> Time {
188 let time = Time::new();
189 self.with_partition(partition)
190 .build(MetricValue::ElapsedCompute(time.clone()));
191 time
192 }
193
194 /// Consumes self and creates a new Timer for recording some
195 /// subset of an operators execution time.
196 pub fn subset_time(
197 self,
198 subset_name: impl Into<Cow<'static, str>>,
199 partition: usize,
200 ) -> Time {
201 let time = Time::new();
202 self.with_partition(partition).build(MetricValue::Time {
203 name: subset_name.into(),
204 time: time.clone(),
205 });
206 time
207 }
208
209 /// Consumes self and creates a new Timestamp for recording the
210 /// starting time of execution for a partition
211 pub fn start_timestamp(self, partition: usize) -> Timestamp {
212 let timestamp = Timestamp::new();
213 self.with_partition(partition)
214 .build(MetricValue::StartTimestamp(timestamp.clone()));
215 timestamp
216 }
217
218 /// Consumes self and creates a new Timestamp for recording the
219 /// ending time of execution for a partition
220 pub fn end_timestamp(self, partition: usize) -> Timestamp {
221 let timestamp = Timestamp::new();
222 self.with_partition(partition)
223 .build(MetricValue::EndTimestamp(timestamp.clone()));
224 timestamp
225 }
226}