datafusion_physical_plan/metrics/
mod.rs1mod baseline;
21mod builder;
22mod value;
23
24use parking_lot::Mutex;
25use std::{
26 borrow::Cow,
27 fmt::{Debug, Display},
28 sync::Arc,
29};
30
31use datafusion_common::HashMap;
32
33pub use baseline::{BaselineMetrics, RecordOutput};
35pub use builder::MetricBuilder;
36pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
37
38#[derive(Debug)]
69pub struct Metric {
70 value: MetricValue,
72
73 labels: Vec<Label>,
75
76 partition: Option<usize>,
79}
80
81impl Display for Metric {
82 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83 write!(f, "{}", self.value.name())?;
84
85 let mut iter = self
86 .partition
87 .iter()
88 .map(|partition| Label::new("partition", partition.to_string()))
89 .chain(self.labels().iter().cloned())
90 .peekable();
91
92 if iter.peek().is_some() {
94 write!(f, "{{")?;
95
96 let mut is_first = true;
97 for i in iter {
98 if !is_first {
99 write!(f, ", ")?;
100 } else {
101 is_first = false;
102 }
103
104 write!(f, "{i}")?;
105 }
106
107 write!(f, "}}")?;
108 }
109
110 write!(f, "={}", self.value)
112 }
113}
114
115impl Metric {
116 pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
119 Self {
120 value,
121 labels: vec![],
122 partition,
123 }
124 }
125
126 pub fn new_with_labels(
129 value: MetricValue,
130 partition: Option<usize>,
131 labels: Vec<Label>,
132 ) -> Self {
133 Self {
134 value,
135 labels,
136 partition,
137 }
138 }
139
140 pub fn with_label(mut self, label: Label) -> Self {
142 self.labels.push(label);
143 self
144 }
145
146 pub fn labels(&self) -> &[Label] {
148 &self.labels
149 }
150
151 pub fn value(&self) -> &MetricValue {
153 &self.value
154 }
155
156 pub fn value_mut(&mut self) -> &mut MetricValue {
158 &mut self.value
159 }
160
161 pub fn partition(&self) -> Option<usize> {
163 self.partition
164 }
165}
166
167#[derive(Default, Debug, Clone)]
171pub struct MetricsSet {
172 metrics: Vec<Arc<Metric>>,
173}
174
175impl MetricsSet {
176 pub fn new() -> Self {
178 Default::default()
179 }
180
181 pub fn push(&mut self, metric: Arc<Metric>) {
183 self.metrics.push(metric)
184 }
185
186 pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
188 self.metrics.iter()
189 }
190
191 pub fn output_rows(&self) -> Option<usize> {
194 self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
195 .map(|v| v.as_usize())
196 }
197
198 pub fn spill_count(&self) -> Option<usize> {
201 self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
202 .map(|v| v.as_usize())
203 }
204
205 pub fn spilled_bytes(&self) -> Option<usize> {
208 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
209 .map(|v| v.as_usize())
210 }
211
212 pub fn spilled_rows(&self) -> Option<usize> {
215 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
216 .map(|v| v.as_usize())
217 }
218
219 pub fn elapsed_compute(&self) -> Option<usize> {
222 self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
223 .map(|v| v.as_usize())
224 }
225
226 pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
230 where
231 F: FnMut(&Metric) -> bool,
232 {
233 let mut iter = self
234 .metrics
235 .iter()
236 .filter(|metric| f(metric.as_ref()))
237 .peekable();
238
239 let mut accum = match iter.peek() {
240 None => {
241 return None;
242 }
243 Some(metric) => metric.value().new_empty(),
244 };
245
246 iter.for_each(|metric| accum.aggregate(metric.value()));
247
248 Some(accum)
249 }
250
251 pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
254 self.sum(|m| match m.value() {
255 MetricValue::Count { name, .. } => name == metric_name,
256 MetricValue::Time { name, .. } => name == metric_name,
257 MetricValue::OutputRows(_) => false,
258 MetricValue::ElapsedCompute(_) => false,
259 MetricValue::SpillCount(_) => false,
260 MetricValue::SpilledBytes(_) => false,
261 MetricValue::SpilledRows(_) => false,
262 MetricValue::CurrentMemoryUsage(_) => false,
263 MetricValue::Gauge { name, .. } => name == metric_name,
264 MetricValue::StartTimestamp(_) => false,
265 MetricValue::EndTimestamp(_) => false,
266 })
267 }
268
269 pub fn aggregate_by_name(&self) -> Self {
274 let mut map = HashMap::new();
275
276 for metric in &self.metrics {
278 let key = metric.value.name();
279 map.entry(key)
280 .and_modify(|accum: &mut Metric| {
281 accum.value_mut().aggregate(metric.value());
282 })
283 .or_insert_with(|| {
284 let partition = None;
286 let mut accum = Metric::new(metric.value().new_empty(), partition);
287 accum.value_mut().aggregate(metric.value());
288 accum
289 });
290 }
291
292 let new_metrics = map
293 .into_iter()
294 .map(|(_k, v)| Arc::new(v))
295 .collect::<Vec<_>>();
296
297 Self {
298 metrics: new_metrics,
299 }
300 }
301
302 pub fn sorted_for_display(mut self) -> Self {
304 self.metrics.sort_unstable_by_key(|metric| {
305 (
306 metric.value().display_sort_key(),
307 metric.value().name().to_owned(),
308 )
309 });
310 self
311 }
312
313 pub fn timestamps_removed(self) -> Self {
315 let Self { metrics } = self;
316
317 let metrics = metrics
318 .into_iter()
319 .filter(|m| !m.value.is_timestamp())
320 .collect::<Vec<_>>();
321
322 Self { metrics }
323 }
324}
325
326impl Display for MetricsSet {
327 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
329 let mut is_first = true;
330 for i in self.metrics.iter() {
331 if !is_first {
332 write!(f, ", ")?;
333 } else {
334 is_first = false;
335 }
336
337 write!(f, "{i}")?;
338 }
339 Ok(())
340 }
341}
342
343#[derive(Default, Debug, Clone)]
355pub struct ExecutionPlanMetricsSet {
356 inner: Arc<Mutex<MetricsSet>>,
357}
358
359impl ExecutionPlanMetricsSet {
360 pub fn new() -> Self {
362 Self {
363 inner: Arc::new(Mutex::new(MetricsSet::new())),
364 }
365 }
366
367 pub fn register(&self, metric: Arc<Metric>) {
369 self.inner.lock().push(metric)
370 }
371
372 pub fn clone_inner(&self) -> MetricsSet {
374 let guard = self.inner.lock();
375 (*guard).clone()
376 }
377}
378
379#[derive(Debug, Clone, PartialEq, Eq, Hash)]
393pub struct Label {
394 name: Cow<'static, str>,
395 value: Cow<'static, str>,
396}
397
398impl Label {
399 pub fn new(
401 name: impl Into<Cow<'static, str>>,
402 value: impl Into<Cow<'static, str>>,
403 ) -> Self {
404 let name = name.into();
405 let value = value.into();
406 Self { name, value }
407 }
408
409 pub fn name(&self) -> &str {
411 self.name.as_ref()
412 }
413
414 pub fn value(&self) -> &str {
416 self.value.as_ref()
417 }
418}
419
420impl Display for Label {
421 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
422 write!(f, "{}={}", self.name, self.value)
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use std::time::Duration;
429
430 use chrono::{TimeZone, Utc};
431
432 use super::*;
433
434 #[test]
435 fn test_display_no_labels_no_partition() {
436 let count = Count::new();
437 count.add(33);
438 let value = MetricValue::OutputRows(count);
439 let partition = None;
440 let metric = Metric::new(value, partition);
441
442 assert_eq!("output_rows=33", metric.to_string())
443 }
444
445 #[test]
446 fn test_display_no_labels_with_partition() {
447 let count = Count::new();
448 count.add(44);
449 let value = MetricValue::OutputRows(count);
450 let partition = Some(1);
451 let metric = Metric::new(value, partition);
452
453 assert_eq!("output_rows{partition=1}=44", metric.to_string())
454 }
455
456 #[test]
457 fn test_display_labels_no_partition() {
458 let count = Count::new();
459 count.add(55);
460 let value = MetricValue::OutputRows(count);
461 let partition = None;
462 let label = Label::new("foo", "bar");
463 let metric = Metric::new_with_labels(value, partition, vec![label]);
464
465 assert_eq!("output_rows{foo=bar}=55", metric.to_string())
466 }
467
468 #[test]
469 fn test_display_labels_and_partition() {
470 let count = Count::new();
471 count.add(66);
472 let value = MetricValue::OutputRows(count);
473 let partition = Some(2);
474 let label = Label::new("foo", "bar");
475 let metric = Metric::new_with_labels(value, partition, vec![label]);
476
477 assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
478 }
479
480 #[test]
481 fn test_output_rows() {
482 let metrics = ExecutionPlanMetricsSet::new();
483 assert!(metrics.clone_inner().output_rows().is_none());
484
485 let partition = 1;
486 let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
487 output_rows.add(13);
488
489 let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
490 output_rows.add(7);
491 assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
492 }
493
494 #[test]
495 fn test_elapsed_compute() {
496 let metrics = ExecutionPlanMetricsSet::new();
497 assert!(metrics.clone_inner().elapsed_compute().is_none());
498
499 let partition = 1;
500 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
501 elapsed_compute.add_duration(Duration::from_nanos(1234));
502
503 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
504 elapsed_compute.add_duration(Duration::from_nanos(6));
505 assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
506 }
507
508 #[test]
509 fn test_sum() {
510 let metrics = ExecutionPlanMetricsSet::new();
511
512 let count1 = MetricBuilder::new(&metrics)
513 .with_new_label("foo", "bar")
514 .counter("my_counter", 1);
515 count1.add(1);
516
517 let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
518 count2.add(2);
519
520 let metrics = metrics.clone_inner();
521 assert!(metrics.sum(|_| false).is_none());
522
523 let expected_count = Count::new();
524 expected_count.add(3);
525 let expected_sum = MetricValue::Count {
526 name: "my_counter".into(),
527 count: expected_count,
528 };
529
530 assert_eq!(metrics.sum(|_| true), Some(expected_sum));
531 }
532
533 #[test]
534 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
535 fn test_bad_sum() {
536 let metrics = ExecutionPlanMetricsSet::new();
538
539 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
540 count.add(1);
541
542 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
543 time.add_duration(Duration::from_nanos(10));
544
545 metrics.clone_inner().sum(|_| true);
547 }
548
549 #[test]
550 fn test_aggregate_by_name() {
551 let metrics = ExecutionPlanMetricsSet::new();
552
553 let elapsed_compute1 = MetricBuilder::new(&metrics)
555 .with_new_label("foo", "bar")
556 .elapsed_compute(1);
557 elapsed_compute1.add_duration(Duration::from_nanos(12));
558
559 let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
560 elapsed_compute2.add_duration(Duration::from_nanos(34));
561
562 let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
563 elapsed_compute3.add_duration(Duration::from_nanos(56));
564
565 let output_rows = MetricBuilder::new(&metrics).output_rows(1); output_rows.add(56);
567
568 let aggregated = metrics.clone_inner().aggregate_by_name();
569
570 let elapsed_computes = aggregated
572 .iter()
573 .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
574 .collect::<Vec<_>>();
575 assert_eq!(elapsed_computes.len(), 1);
576 assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
577 assert!(elapsed_computes[0].partition().is_none());
578
579 let output_rows = aggregated
581 .iter()
582 .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
583 .collect::<Vec<_>>();
584 assert_eq!(output_rows.len(), 1);
585 assert_eq!(output_rows[0].value().as_usize(), 56);
586 assert!(output_rows[0].partition.is_none())
587 }
588
589 #[test]
590 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
591 fn test_aggregate_partition_bad_sum() {
592 let metrics = ExecutionPlanMetricsSet::new();
593
594 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
595 count.add(1);
596
597 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
598 time.add_duration(Duration::from_nanos(10));
599
600 metrics.clone_inner().aggregate_by_name();
602 }
603
604 #[test]
605 fn test_aggregate_partition_timestamps() {
606 let metrics = ExecutionPlanMetricsSet::new();
607
608 let t1 = Utc.timestamp_nanos(1431648000000000);
610 let t2 = Utc.timestamp_nanos(1531648000000000);
612 let t3 = Utc.timestamp_nanos(1631648000000000);
614 let t4 = Utc.timestamp_nanos(1731648000000000);
616
617 let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
618 start_timestamp0.set(t1);
619 let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
620 end_timestamp0.set(t2);
621 let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
622 start_timestamp1.set(t3);
623 let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
624 end_timestamp1.set(t4);
625
626 let aggregated = metrics.clone_inner().aggregate_by_name();
628
629 let mut ts = aggregated
630 .iter()
631 .filter(|metric| {
632 matches!(metric.value(), MetricValue::StartTimestamp(_))
633 && metric.labels().is_empty()
634 })
635 .collect::<Vec<_>>();
636 assert_eq!(ts.len(), 1);
637 match ts.remove(0).value() {
638 MetricValue::StartTimestamp(ts) => {
639 assert_eq!(ts.value(), Some(t1));
641 }
642 _ => {
643 panic!("Not a timestamp");
644 }
645 };
646
647 let mut ts = aggregated
648 .iter()
649 .filter(|metric| {
650 matches!(metric.value(), MetricValue::EndTimestamp(_))
651 && metric.labels().is_empty()
652 })
653 .collect::<Vec<_>>();
654 assert_eq!(ts.len(), 1);
655 match ts.remove(0).value() {
656 MetricValue::EndTimestamp(ts) => {
657 assert_eq!(ts.value(), Some(t4));
659 }
660 _ => {
661 panic!("Not a timestamp");
662 }
663 };
664 }
665
666 #[test]
667 fn test_sorted_for_display() {
668 let metrics = ExecutionPlanMetricsSet::new();
669 MetricBuilder::new(&metrics).end_timestamp(0);
670 MetricBuilder::new(&metrics).start_timestamp(0);
671 MetricBuilder::new(&metrics).elapsed_compute(0);
672 MetricBuilder::new(&metrics).counter("the_second_counter", 0);
673 MetricBuilder::new(&metrics).counter("the_counter", 0);
674 MetricBuilder::new(&metrics).counter("the_third_counter", 0);
675 MetricBuilder::new(&metrics).subset_time("the_time", 0);
676 MetricBuilder::new(&metrics).output_rows(0);
677 let metrics = metrics.clone_inner();
678
679 fn metric_names(metrics: &MetricsSet) -> String {
680 let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
681 n.join(", ")
682 }
683
684 assert_eq!("end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows", metric_names(&metrics));
685
686 let metrics = metrics.sorted_for_display();
687 assert_eq!("output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp", metric_names(&metrics));
688 }
689}