use std::{collections::HashMap, fmt, sync::RwLock};
use tracing::{field::Visit, Subscriber};
use tracing_core::{Field, Interest, Metadata};
#[cfg(feature = "metrics_gauge_unstable")]
use opentelemetry::metrics::Gauge;
use opentelemetry::{
metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter},
InstrumentationScope, KeyValue, Value,
};
use tracing_subscriber::{
filter::Filtered,
layer::{Context, Filter},
registry::LookupSpan,
Layer,
};
use smallvec::SmallVec;
const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry";
const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter.";
const METRIC_PREFIX_COUNTER: &str = "counter.";
const METRIC_PREFIX_HISTOGRAM: &str = "histogram.";
#[cfg(feature = "metrics_gauge_unstable")]
const METRIC_PREFIX_GAUGE: &str = "gauge.";
const I64_MAX: u64 = i64::MAX as u64;
#[derive(Default)]
pub(crate) struct Instruments {
u64_counter: MetricsMap<Counter<u64>>,
f64_counter: MetricsMap<Counter<f64>>,
i64_up_down_counter: MetricsMap<UpDownCounter<i64>>,
f64_up_down_counter: MetricsMap<UpDownCounter<f64>>,
u64_histogram: MetricsMap<Histogram<u64>>,
f64_histogram: MetricsMap<Histogram<f64>>,
#[cfg(feature = "metrics_gauge_unstable")]
u64_gauge: MetricsMap<Gauge<u64>>,
#[cfg(feature = "metrics_gauge_unstable")]
i64_gauge: MetricsMap<Gauge<i64>>,
#[cfg(feature = "metrics_gauge_unstable")]
f64_gauge: MetricsMap<Gauge<f64>>,
}
type MetricsMap<T> = RwLock<HashMap<&'static str, T>>;
#[derive(Copy, Clone, Debug)]
pub(crate) enum InstrumentType {
CounterU64(u64),
CounterF64(f64),
UpDownCounterI64(i64),
UpDownCounterF64(f64),
HistogramU64(u64),
HistogramF64(f64),
#[cfg(feature = "metrics_gauge_unstable")]
GaugeU64(u64),
#[cfg(feature = "metrics_gauge_unstable")]
GaugeI64(i64),
#[cfg(feature = "metrics_gauge_unstable")]
GaugeF64(f64),
}
impl Instruments {
pub(crate) fn update_metric(
&self,
meter: &Meter,
instrument_type: InstrumentType,
metric_name: &'static str,
attributes: &[KeyValue],
) {
fn update_or_insert<T>(
map: &MetricsMap<T>,
name: &'static str,
insert: impl FnOnce() -> T,
update: impl FnOnce(&T),
) {
{
let lock = map.read().unwrap();
if let Some(metric) = lock.get(name) {
update(metric);
return;
}
}
let mut lock = map.write().unwrap();
let metric = lock.entry(name).or_insert_with(insert);
update(metric)
}
match instrument_type {
InstrumentType::CounterU64(value) => {
update_or_insert(
&self.u64_counter,
metric_name,
|| meter.u64_counter(metric_name).build(),
|ctr| ctr.add(value, attributes),
);
}
InstrumentType::CounterF64(value) => {
update_or_insert(
&self.f64_counter,
metric_name,
|| meter.f64_counter(metric_name).build(),
|ctr| ctr.add(value, attributes),
);
}
InstrumentType::UpDownCounterI64(value) => {
update_or_insert(
&self.i64_up_down_counter,
metric_name,
|| meter.i64_up_down_counter(metric_name).build(),
|ctr| ctr.add(value, attributes),
);
}
InstrumentType::UpDownCounterF64(value) => {
update_or_insert(
&self.f64_up_down_counter,
metric_name,
|| meter.f64_up_down_counter(metric_name).build(),
|ctr| ctr.add(value, attributes),
);
}
InstrumentType::HistogramU64(value) => {
update_or_insert(
&self.u64_histogram,
metric_name,
|| meter.u64_histogram(metric_name).build(),
|rec| rec.record(value, attributes),
);
}
InstrumentType::HistogramF64(value) => {
update_or_insert(
&self.f64_histogram,
metric_name,
|| meter.f64_histogram(metric_name).build(),
|rec| rec.record(value, attributes),
);
}
#[cfg(feature = "metrics_gauge_unstable")]
InstrumentType::GaugeU64(value) => {
update_or_insert(
&self.u64_gauge,
metric_name,
|| meter.u64_gauge(metric_name).build(),
|rec| rec.record(value, attributes),
);
}
#[cfg(feature = "metrics_gauge_unstable")]
InstrumentType::GaugeI64(value) => {
update_or_insert(
&self.i64_gauge,
metric_name,
|| meter.i64_gauge(metric_name).build(),
|rec| rec.record(value, attributes),
);
}
#[cfg(feature = "metrics_gauge_unstable")]
InstrumentType::GaugeF64(value) => {
update_or_insert(
&self.f64_gauge,
metric_name,
|| meter.f64_gauge(metric_name).build(),
|rec| rec.record(value, attributes),
);
}
};
}
}
pub(crate) struct MetricVisitor<'a> {
attributes: &'a mut SmallVec<[KeyValue; 8]>,
visited_metrics: &'a mut SmallVec<[(&'static str, InstrumentType); 2]>,
}
impl<'a> Visit for MetricVisitor<'a> {
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
self.attributes
.push(KeyValue::new(field.name(), format!("{value:?}")));
}
fn record_u64(&mut self, field: &Field, value: u64) {
#[cfg(feature = "metrics_gauge_unstable")]
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
self.visited_metrics
.push((metric_name, InstrumentType::GaugeU64(value)));
return;
}
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
self.visited_metrics
.push((metric_name, InstrumentType::CounterU64(value)));
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
if value <= I64_MAX {
self.visited_metrics
.push((metric_name, InstrumentType::UpDownCounterI64(value as i64)));
} else {
eprintln!(
"[tracing-opentelemetry]: Received Counter metric, but \
provided u64: {} is greater than i64::MAX. Ignoring \
this metric.",
value
);
}
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
self.visited_metrics
.push((metric_name, InstrumentType::HistogramU64(value)));
} else if value <= I64_MAX {
self.attributes
.push(KeyValue::new(field.name(), Value::I64(value as i64)));
}
}
fn record_f64(&mut self, field: &Field, value: f64) {
#[cfg(feature = "metrics_gauge_unstable")]
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
self.visited_metrics
.push((metric_name, InstrumentType::GaugeF64(value)));
return;
}
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
self.visited_metrics
.push((metric_name, InstrumentType::CounterF64(value)));
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
self.visited_metrics
.push((metric_name, InstrumentType::UpDownCounterF64(value)));
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
self.visited_metrics
.push((metric_name, InstrumentType::HistogramF64(value)));
} else {
self.attributes
.push(KeyValue::new(field.name(), Value::F64(value)));
}
}
fn record_i64(&mut self, field: &Field, value: i64) {
#[cfg(feature = "metrics_gauge_unstable")]
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
self.visited_metrics
.push((metric_name, InstrumentType::GaugeI64(value)));
return;
}
if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
self.visited_metrics
.push((metric_name, InstrumentType::CounterU64(value as u64)));
} else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
self.visited_metrics
.push((metric_name, InstrumentType::UpDownCounterI64(value)));
} else {
self.attributes.push(KeyValue::new(field.name(), value));
}
}
fn record_str(&mut self, field: &Field, value: &str) {
self.attributes
.push(KeyValue::new(field.name(), value.to_owned()));
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.attributes.push(KeyValue::new(field.name(), value));
}
}
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
pub struct MetricsLayer<S> {
inner: Filtered<InstrumentLayer, MetricsFilter, S>,
}
impl<S> MetricsLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
pub fn new<M>(meter_provider: M) -> MetricsLayer<S>
where
M: MeterProvider,
{
let meter = meter_provider.meter_with_scope(
InstrumentationScope::builder(INSTRUMENTATION_LIBRARY_NAME)
.with_version(CARGO_PKG_VERSION)
.build(),
);
let layer = InstrumentLayer {
meter,
instruments: Default::default(),
};
MetricsLayer {
inner: layer.with_filter(MetricsFilter),
}
}
}
struct MetricsFilter;
impl MetricsFilter {
fn is_metrics_event(&self, meta: &Metadata<'_>) -> bool {
meta.is_event()
&& meta.fields().iter().any(|field| {
let name = field.name();
if name.starts_with(METRIC_PREFIX_COUNTER)
|| name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER)
|| name.starts_with(METRIC_PREFIX_HISTOGRAM)
{
return true;
}
#[cfg(feature = "metrics_gauge_unstable")]
if name.starts_with(METRIC_PREFIX_GAUGE) {
return true;
}
false
})
}
}
impl<S> Filter<S> for MetricsFilter {
fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
self.is_metrics_event(meta)
}
fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest {
if self.is_metrics_event(meta) {
Interest::always()
} else {
Interest::never()
}
}
}
struct InstrumentLayer {
meter: Meter,
instruments: Instruments,
}
impl<S> Layer<S> for InstrumentLayer
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let mut attributes = SmallVec::new();
let mut visited_metrics = SmallVec::new();
let mut metric_visitor = MetricVisitor {
attributes: &mut attributes,
visited_metrics: &mut visited_metrics,
};
event.record(&mut metric_visitor);
visited_metrics
.into_iter()
.for_each(|(metric_name, value)| {
self.instruments.update_metric(
&self.meter,
value,
metric_name,
attributes.as_slice(),
);
})
}
}
impl<S> Layer<S> for MetricsLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_layer(&mut self, subscriber: &mut S) {
self.inner.on_layer(subscriber)
}
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
self.inner.register_callsite(metadata)
}
fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
self.inner.enabled(metadata, ctx)
}
fn on_new_span(
&self,
attrs: &tracing_core::span::Attributes<'_>,
id: &tracing_core::span::Id,
ctx: Context<'_, S>,
) {
self.inner.on_new_span(attrs, id, ctx)
}
fn max_level_hint(&self) -> Option<tracing_core::LevelFilter> {
self.inner.max_level_hint()
}
fn on_record(
&self,
span: &tracing_core::span::Id,
values: &tracing_core::span::Record<'_>,
ctx: Context<'_, S>,
) {
self.inner.on_record(span, values, ctx)
}
fn on_follows_from(
&self,
span: &tracing_core::span::Id,
follows: &tracing_core::span::Id,
ctx: Context<'_, S>,
) {
self.inner.on_follows_from(span, follows, ctx)
}
fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) {
self.inner.on_event(event, ctx)
}
fn on_enter(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
self.inner.on_enter(id, ctx)
}
fn on_exit(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
self.inner.on_exit(id, ctx)
}
fn on_close(&self, id: tracing_core::span::Id, ctx: Context<'_, S>) {
self.inner.on_close(id, ctx)
}
fn on_id_change(
&self,
old: &tracing_core::span::Id,
new: &tracing_core::span::Id,
ctx: Context<'_, S>,
) {
self.inner.on_id_change(old, new, ctx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tracing_subscriber::layer::SubscriberExt;
struct PanicLayer;
impl<S> Layer<S> for PanicLayer
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_event(&self, _event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
panic!("panic");
}
}
#[test]
fn filter_layer_should_filter_non_metrics_event() {
let layer = PanicLayer.with_filter(MetricsFilter);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
tracing::info!(key = "val", "foo");
});
}
}