rama_http/layer/trace/on_eos.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
use super::{Latency, DEFAULT_MESSAGE_LEVEL};
use crate::header::HeaderMap;
use crate::layer::classify::grpc_errors_as_failures::ParsedGrpcStatus;
use rama_utils::latency::LatencyUnit;
use std::time::Duration;
use tracing::{Level, Span};
/// Trait used to tell [`Trace`] what to do when a stream closes.
///
/// See the [module docs](../trace/index.html#on_eos) for details on exactly when the `on_eos`
/// callback is called.
///
/// [`Trace`]: super::Trace
pub trait OnEos: Send + Sync + 'static {
/// Do the thing.
///
/// `stream_duration` is the duration since the response was sent.
///
/// `span` is the `tracing` [`Span`], corresponding to this request, produced by the closure
/// passed to [`TraceLayer::make_span_with`]. It can be used to [record field values][record]
/// that weren't known when the span was created.
///
/// [`Span`]: https://docs.rs/tracing/latest/tracing/span/index.html
/// [record]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.record
/// [`TraceLayer::make_span_with`]: crate::layer::trace::TraceLayer::make_span_with
fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span);
}
impl OnEos for () {
#[inline]
fn on_eos(self, _: Option<&HeaderMap>, _: Duration, _: &Span) {}
}
impl<F> OnEos for F
where
F: Fn(Option<&HeaderMap>, Duration, &Span) + Send + Sync + 'static,
{
fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span) {
self(trailers, stream_duration, span)
}
}
/// The default [`OnEos`] implementation used by [`Trace`].
///
/// [`Trace`]: super::Trace
#[derive(Clone, Debug)]
pub struct DefaultOnEos {
level: Level,
latency_unit: LatencyUnit,
}
impl Default for DefaultOnEos {
fn default() -> Self {
Self {
level: DEFAULT_MESSAGE_LEVEL,
latency_unit: LatencyUnit::Millis,
}
}
}
impl DefaultOnEos {
/// Create a new [`DefaultOnEos`].
pub fn new() -> Self {
Self::default()
}
/// Set the [`Level`] used for [tracing events].
///
/// Defaults to [`Level::DEBUG`].
///
/// [tracing events]: https://docs.rs/tracing/latest/tracing/#events
/// [`Level::DEBUG`]: https://docs.rs/tracing/latest/tracing/struct.Level.html#associatedconstant.DEBUG
pub fn level(mut self, level: Level) -> Self {
self.level = level;
self
}
/// Set the [`Level`] used for [tracing events].
///
/// Defaults to [`Level::DEBUG`].
///
/// [tracing events]: https://docs.rs/tracing/latest/tracing/#events
/// [`Level::DEBUG`]: https://docs.rs/tracing/latest/tracing/struct.Level.html#associatedconstant.DEBUG
pub fn set_level(&mut self, level: Level) -> &mut Self {
self.level = level;
self
}
/// Set the [`LatencyUnit`] latencies will be reported in.
///
/// Defaults to [`LatencyUnit::Millis`].
pub fn latency_unit(mut self, latency_unit: LatencyUnit) -> Self {
self.latency_unit = latency_unit;
self
}
/// Set the [`LatencyUnit`] latencies will be reported in.
///
/// Defaults to [`LatencyUnit::Millis`].
pub fn set_latency_unit(&mut self, latency_unit: LatencyUnit) -> &mut Self {
self.latency_unit = latency_unit;
self
}
}
impl OnEos for DefaultOnEos {
fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span) {
let stream_duration = Latency {
unit: self.latency_unit,
duration: stream_duration,
};
let status = trailers.and_then(|trailers| {
match crate::layer::classify::grpc_errors_as_failures::classify_grpc_metadata(
trailers,
crate::layer::classify::GrpcCode::Ok.into_bitmask(),
) {
ParsedGrpcStatus::Success
| ParsedGrpcStatus::HeaderNotString
| ParsedGrpcStatus::HeaderNotInt => Some(0),
ParsedGrpcStatus::NonSuccess(status) => Some(status.get()),
ParsedGrpcStatus::GrpcStatusHeaderMissing => None,
}
});
event_dynamic_lvl!(self.level, %stream_duration, status, "end of stream");
}
}