rama_http/layer/trace/
on_eos.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use super::{Latency, DEFAULT_MESSAGE_LEVEL};
use crate::header::HeaderMap;
use crate::layer::classify::grpc_errors_as_failures::ParsedGrpcStatus;
use rama_utils::latency::LatencyUnit;
use std::time::Duration;
use tracing::{Level, Span};

/// Trait used to tell [`Trace`] what to do when a stream closes.
///
/// See the [module docs](../trace/index.html#on_eos) for details on exactly when the `on_eos`
/// callback is called.
///
/// [`Trace`]: super::Trace
pub trait OnEos: Send + Sync + 'static {
    /// Do the thing.
    ///
    /// `stream_duration` is the duration since the response was sent.
    ///
    /// `span` is the `tracing` [`Span`], corresponding to this request, produced by the closure
    /// passed to [`TraceLayer::make_span_with`]. It can be used to [record field values][record]
    /// that weren't known when the span was created.
    ///
    /// [`Span`]: https://docs.rs/tracing/latest/tracing/span/index.html
    /// [record]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.record
    /// [`TraceLayer::make_span_with`]: crate::layer::trace::TraceLayer::make_span_with
    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span);
}

impl OnEos for () {
    #[inline]
    fn on_eos(self, _: Option<&HeaderMap>, _: Duration, _: &Span) {}
}

impl<F> OnEos for F
where
    F: Fn(Option<&HeaderMap>, Duration, &Span) + Send + Sync + 'static,
{
    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span) {
        self(trailers, stream_duration, span)
    }
}

/// The default [`OnEos`] implementation used by [`Trace`].
///
/// [`Trace`]: super::Trace
#[derive(Clone, Debug)]
pub struct DefaultOnEos {
    level: Level,
    latency_unit: LatencyUnit,
}

impl Default for DefaultOnEos {
    fn default() -> Self {
        Self {
            level: DEFAULT_MESSAGE_LEVEL,
            latency_unit: LatencyUnit::Millis,
        }
    }
}

impl DefaultOnEos {
    /// Create a new [`DefaultOnEos`].
    pub fn new() -> Self {
        Self::default()
    }

    /// Set the [`Level`] used for [tracing events].
    ///
    /// Defaults to [`Level::DEBUG`].
    ///
    /// [tracing events]: https://docs.rs/tracing/latest/tracing/#events
    /// [`Level::DEBUG`]: https://docs.rs/tracing/latest/tracing/struct.Level.html#associatedconstant.DEBUG
    pub fn level(mut self, level: Level) -> Self {
        self.level = level;
        self
    }

    /// Set the [`Level`] used for [tracing events].
    ///
    /// Defaults to [`Level::DEBUG`].
    ///
    /// [tracing events]: https://docs.rs/tracing/latest/tracing/#events
    /// [`Level::DEBUG`]: https://docs.rs/tracing/latest/tracing/struct.Level.html#associatedconstant.DEBUG
    pub fn set_level(&mut self, level: Level) -> &mut Self {
        self.level = level;
        self
    }

    /// Set the [`LatencyUnit`] latencies will be reported in.
    ///
    /// Defaults to [`LatencyUnit::Millis`].
    pub fn latency_unit(mut self, latency_unit: LatencyUnit) -> Self {
        self.latency_unit = latency_unit;
        self
    }

    /// Set the [`LatencyUnit`] latencies will be reported in.
    ///
    /// Defaults to [`LatencyUnit::Millis`].
    pub fn set_latency_unit(&mut self, latency_unit: LatencyUnit) -> &mut Self {
        self.latency_unit = latency_unit;
        self
    }
}

impl OnEos for DefaultOnEos {
    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span) {
        let stream_duration = Latency {
            unit: self.latency_unit,
            duration: stream_duration,
        };
        let status = trailers.and_then(|trailers| {
            match crate::layer::classify::grpc_errors_as_failures::classify_grpc_metadata(
                trailers,
                crate::layer::classify::GrpcCode::Ok.into_bitmask(),
            ) {
                ParsedGrpcStatus::Success
                | ParsedGrpcStatus::HeaderNotString
                | ParsedGrpcStatus::HeaderNotInt => Some(0),
                ParsedGrpcStatus::NonSuccess(status) => Some(status.get()),
                ParsedGrpcStatus::GrpcStatusHeaderMissing => None,
            }
        });

        event_dynamic_lvl!(self.level, %stream_duration, status, "end of stream");
    }
}