datafusion_physical_plan/metrics/
mod.rsmod baseline;
mod builder;
mod value;
use parking_lot::Mutex;
use std::{
borrow::Cow,
fmt::{Debug, Display},
sync::Arc,
};
use hashbrown::HashMap;
pub use baseline::{BaselineMetrics, RecordOutput};
pub use builder::MetricBuilder;
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
#[derive(Debug)]
pub struct Metric {
value: MetricValue,
labels: Vec<Label>,
partition: Option<usize>,
}
impl Display for Metric {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.value.name())?;
let mut iter = self
.partition
.iter()
.map(|partition| Label::new("partition", partition.to_string()))
.chain(self.labels().iter().cloned())
.peekable();
if iter.peek().is_some() {
write!(f, "{{")?;
let mut is_first = true;
for i in iter {
if !is_first {
write!(f, ", ")?;
} else {
is_first = false;
}
write!(f, "{i}")?;
}
write!(f, "}}")?;
}
write!(f, "={}", self.value)
}
}
impl Metric {
pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
Self {
value,
labels: vec![],
partition,
}
}
pub fn new_with_labels(
value: MetricValue,
partition: Option<usize>,
labels: Vec<Label>,
) -> Self {
Self {
value,
labels,
partition,
}
}
pub fn with_label(mut self, label: Label) -> Self {
self.labels.push(label);
self
}
pub fn labels(&self) -> &[Label] {
&self.labels
}
pub fn value(&self) -> &MetricValue {
&self.value
}
pub fn value_mut(&mut self) -> &mut MetricValue {
&mut self.value
}
pub fn partition(&self) -> Option<usize> {
self.partition
}
}
#[derive(Default, Debug, Clone)]
pub struct MetricsSet {
metrics: Vec<Arc<Metric>>,
}
impl MetricsSet {
pub fn new() -> Self {
Default::default()
}
pub fn push(&mut self, metric: Arc<Metric>) {
self.metrics.push(metric)
}
pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
self.metrics.iter()
}
pub fn output_rows(&self) -> Option<usize> {
self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
.map(|v| v.as_usize())
}
pub fn spill_count(&self) -> Option<usize> {
self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
.map(|v| v.as_usize())
}
pub fn spilled_bytes(&self) -> Option<usize> {
self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
.map(|v| v.as_usize())
}
pub fn spilled_rows(&self) -> Option<usize> {
self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
.map(|v| v.as_usize())
}
pub fn elapsed_compute(&self) -> Option<usize> {
self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
.map(|v| v.as_usize())
}
pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
where
F: FnMut(&Metric) -> bool,
{
let mut iter = self
.metrics
.iter()
.filter(|metric| f(metric.as_ref()))
.peekable();
let mut accum = match iter.peek() {
None => {
return None;
}
Some(metric) => metric.value().new_empty(),
};
iter.for_each(|metric| accum.aggregate(metric.value()));
Some(accum)
}
pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
self.sum(|m| match m.value() {
MetricValue::Count { name, .. } => name == metric_name,
MetricValue::Time { name, .. } => name == metric_name,
MetricValue::OutputRows(_) => false,
MetricValue::ElapsedCompute(_) => false,
MetricValue::SpillCount(_) => false,
MetricValue::SpilledBytes(_) => false,
MetricValue::SpilledRows(_) => false,
MetricValue::CurrentMemoryUsage(_) => false,
MetricValue::Gauge { name, .. } => name == metric_name,
MetricValue::StartTimestamp(_) => false,
MetricValue::EndTimestamp(_) => false,
})
}
pub fn aggregate_by_name(&self) -> Self {
let mut map = HashMap::new();
for metric in &self.metrics {
let key = metric.value.name();
map.entry(key)
.and_modify(|accum: &mut Metric| {
accum.value_mut().aggregate(metric.value());
})
.or_insert_with(|| {
let partition = None;
let mut accum = Metric::new(metric.value().new_empty(), partition);
accum.value_mut().aggregate(metric.value());
accum
});
}
let new_metrics = map
.into_iter()
.map(|(_k, v)| Arc::new(v))
.collect::<Vec<_>>();
Self {
metrics: new_metrics,
}
}
pub fn sorted_for_display(mut self) -> Self {
self.metrics.sort_unstable_by_key(|metric| {
(
metric.value().display_sort_key(),
metric.value().name().to_owned(),
)
});
self
}
pub fn timestamps_removed(self) -> Self {
let Self { metrics } = self;
let metrics = metrics
.into_iter()
.filter(|m| !m.value.is_timestamp())
.collect::<Vec<_>>();
Self { metrics }
}
}
impl Display for MetricsSet {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let mut is_first = true;
for i in self.metrics.iter() {
if !is_first {
write!(f, ", ")?;
} else {
is_first = false;
}
write!(f, "{i}")?;
}
Ok(())
}
}
#[derive(Default, Debug, Clone)]
pub struct ExecutionPlanMetricsSet {
inner: Arc<Mutex<MetricsSet>>,
}
impl ExecutionPlanMetricsSet {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(MetricsSet::new())),
}
}
pub fn register(&self, metric: Arc<Metric>) {
self.inner.lock().push(metric)
}
pub fn clone_inner(&self) -> MetricsSet {
let guard = self.inner.lock();
(*guard).clone()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Label {
name: Cow<'static, str>,
value: Cow<'static, str>,
}
impl Label {
pub fn new(
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) -> Self {
let name = name.into();
let value = value.into();
Self { name, value }
}
pub fn name(&self) -> &str {
self.name.as_ref()
}
pub fn value(&self) -> &str {
self.value.as_ref()
}
}
impl Display for Label {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}={}", self.name, self.value)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use chrono::{TimeZone, Utc};
use super::*;
#[test]
fn test_display_no_labels_no_partition() {
let count = Count::new();
count.add(33);
let value = MetricValue::OutputRows(count);
let partition = None;
let metric = Metric::new(value, partition);
assert_eq!("output_rows=33", metric.to_string())
}
#[test]
fn test_display_no_labels_with_partition() {
let count = Count::new();
count.add(44);
let value = MetricValue::OutputRows(count);
let partition = Some(1);
let metric = Metric::new(value, partition);
assert_eq!("output_rows{partition=1}=44", metric.to_string())
}
#[test]
fn test_display_labels_no_partition() {
let count = Count::new();
count.add(55);
let value = MetricValue::OutputRows(count);
let partition = None;
let label = Label::new("foo", "bar");
let metric = Metric::new_with_labels(value, partition, vec![label]);
assert_eq!("output_rows{foo=bar}=55", metric.to_string())
}
#[test]
fn test_display_labels_and_partition() {
let count = Count::new();
count.add(66);
let value = MetricValue::OutputRows(count);
let partition = Some(2);
let label = Label::new("foo", "bar");
let metric = Metric::new_with_labels(value, partition, vec![label]);
assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
}
#[test]
fn test_output_rows() {
let metrics = ExecutionPlanMetricsSet::new();
assert!(metrics.clone_inner().output_rows().is_none());
let partition = 1;
let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
output_rows.add(13);
let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
output_rows.add(7);
assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
}
#[test]
fn test_elapsed_compute() {
let metrics = ExecutionPlanMetricsSet::new();
assert!(metrics.clone_inner().elapsed_compute().is_none());
let partition = 1;
let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
elapsed_compute.add_duration(Duration::from_nanos(1234));
let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
elapsed_compute.add_duration(Duration::from_nanos(6));
assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
}
#[test]
fn test_sum() {
let metrics = ExecutionPlanMetricsSet::new();
let count1 = MetricBuilder::new(&metrics)
.with_new_label("foo", "bar")
.counter("my_counter", 1);
count1.add(1);
let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
count2.add(2);
let metrics = metrics.clone_inner();
assert!(metrics.sum(|_| false).is_none());
let expected_count = Count::new();
expected_count.add(3);
let expected_sum = MetricValue::Count {
name: "my_counter".into(),
count: expected_count,
};
assert_eq!(metrics.sum(|_| true), Some(expected_sum));
}
#[test]
#[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
fn test_bad_sum() {
let metrics = ExecutionPlanMetricsSet::new();
let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
count.add(1);
let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
time.add_duration(Duration::from_nanos(10));
metrics.clone_inner().sum(|_| true);
}
#[test]
fn test_aggregate_by_name() {
let metrics = ExecutionPlanMetricsSet::new();
let elapsed_compute1 = MetricBuilder::new(&metrics)
.with_new_label("foo", "bar")
.elapsed_compute(1);
elapsed_compute1.add_duration(Duration::from_nanos(12));
let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
elapsed_compute2.add_duration(Duration::from_nanos(34));
let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
elapsed_compute3.add_duration(Duration::from_nanos(56));
let output_rows = MetricBuilder::new(&metrics).output_rows(1); output_rows.add(56);
let aggregated = metrics.clone_inner().aggregate_by_name();
let elapsed_computes = aggregated
.iter()
.filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
.collect::<Vec<_>>();
assert_eq!(elapsed_computes.len(), 1);
assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
assert!(elapsed_computes[0].partition().is_none());
let output_rows = aggregated
.iter()
.filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
.collect::<Vec<_>>();
assert_eq!(output_rows.len(), 1);
assert_eq!(output_rows[0].value().as_usize(), 56);
assert!(output_rows[0].partition.is_none())
}
#[test]
#[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
fn test_aggregate_partition_bad_sum() {
let metrics = ExecutionPlanMetricsSet::new();
let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
count.add(1);
let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
time.add_duration(Duration::from_nanos(10));
metrics.clone_inner().aggregate_by_name();
}
#[test]
fn test_aggregate_partition_timestamps() {
let metrics = ExecutionPlanMetricsSet::new();
let t1 = Utc.timestamp_nanos(1431648000000000);
let t2 = Utc.timestamp_nanos(1531648000000000);
let t3 = Utc.timestamp_nanos(1631648000000000);
let t4 = Utc.timestamp_nanos(1731648000000000);
let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
start_timestamp0.set(t1);
let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
end_timestamp0.set(t2);
let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
start_timestamp1.set(t3);
let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
end_timestamp1.set(t4);
let aggregated = metrics.clone_inner().aggregate_by_name();
let mut ts = aggregated
.iter()
.filter(|metric| {
matches!(metric.value(), MetricValue::StartTimestamp(_))
&& metric.labels().is_empty()
})
.collect::<Vec<_>>();
assert_eq!(ts.len(), 1);
match ts.remove(0).value() {
MetricValue::StartTimestamp(ts) => {
assert_eq!(ts.value(), Some(t1));
}
_ => {
panic!("Not a timestamp");
}
};
let mut ts = aggregated
.iter()
.filter(|metric| {
matches!(metric.value(), MetricValue::EndTimestamp(_))
&& metric.labels().is_empty()
})
.collect::<Vec<_>>();
assert_eq!(ts.len(), 1);
match ts.remove(0).value() {
MetricValue::EndTimestamp(ts) => {
assert_eq!(ts.value(), Some(t4));
}
_ => {
panic!("Not a timestamp");
}
};
}
#[test]
fn test_sorted_for_display() {
let metrics = ExecutionPlanMetricsSet::new();
MetricBuilder::new(&metrics).end_timestamp(0);
MetricBuilder::new(&metrics).start_timestamp(0);
MetricBuilder::new(&metrics).elapsed_compute(0);
MetricBuilder::new(&metrics).counter("the_second_counter", 0);
MetricBuilder::new(&metrics).counter("the_counter", 0);
MetricBuilder::new(&metrics).counter("the_third_counter", 0);
MetricBuilder::new(&metrics).subset_time("the_time", 0);
MetricBuilder::new(&metrics).output_rows(0);
let metrics = metrics.clone_inner();
fn metric_names(metrics: &MetricsSet) -> String {
let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
n.join(", ")
}
assert_eq!("end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows", metric_names(&metrics));
let metrics = metrics.sorted_for_display();
assert_eq!("output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp", metric_names(&metrics));
}
}