aws_smithy_runtime/client/
metrics.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use aws_smithy_async::time::{SharedTimeSource, TimeSource};
7use aws_smithy_observability::{
8    global::get_telemetry_provider, instruments::Histogram, AttributeValue, Attributes,
9    ObservabilityError,
10};
11use aws_smithy_runtime_api::client::{
12    interceptors::Intercept, orchestrator::Metadata, runtime_components::RuntimeComponentsBuilder,
13    runtime_plugin::RuntimePlugin,
14};
15use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace};
16use std::{borrow::Cow, sync::Arc, time::SystemTime};
17
18/// Struct to hold metric data in the ConfigBag
19#[derive(Debug, Clone)]
20pub(crate) struct MeasurementsContainer {
21    call_start: SystemTime,
22    attempts: u32,
23    attempt_start: SystemTime,
24}
25
26impl Storable for MeasurementsContainer {
27    type Storer = StoreReplace<Self>;
28}
29
30/// Instruments for recording a single operation
31#[derive(Debug, Clone)]
32pub(crate) struct OperationTelemetry {
33    pub(crate) operation_duration: Arc<dyn Histogram>,
34    pub(crate) attempt_duration: Arc<dyn Histogram>,
35}
36
37impl OperationTelemetry {
38    pub(crate) fn new(scope: &'static str) -> Result<Self, ObservabilityError> {
39        let meter = get_telemetry_provider()?
40            .meter_provider()
41            .get_meter(scope, None);
42
43        Ok(Self{
44            operation_duration: meter
45                .create_histogram("smithy.client.call.duration")
46                .set_units("s")
47                .set_description("Overall call duration (including retries and time to send or receive request and response body)")
48                .build(),
49            attempt_duration: meter
50                .create_histogram("smithy.client.call.attempt.duration")
51                .set_units("s")
52                .set_description("The time it takes to connect to the service, send the request, and get back HTTP status code and headers (including time queued waiting to be sent)")
53                .build(),
54        })
55    }
56}
57
58impl Storable for OperationTelemetry {
59    type Storer = StoreReplace<Self>;
60}
61
62#[derive(Debug)]
63pub(crate) struct MetricsInterceptor {
64    // Holding a TimeSource here isn't ideal, but RuntimeComponents aren't available in
65    // the read_before_execution hook and that is when we need to start the timer for
66    // the operation.
67    time_source: SharedTimeSource,
68}
69
70impl MetricsInterceptor {
71    pub(crate) fn new(time_source: SharedTimeSource) -> Result<Self, ObservabilityError> {
72        Ok(MetricsInterceptor { time_source })
73    }
74
75    pub(crate) fn get_attrs_from_cfg(
76        &self,
77        cfg: &aws_smithy_types::config_bag::ConfigBag,
78    ) -> Option<Attributes> {
79        let operation_metadata = cfg.load::<Metadata>();
80
81        if let Some(md) = operation_metadata {
82            let mut attributes = Attributes::new();
83            attributes.set("rpc.service", AttributeValue::String(md.service().into()));
84            attributes.set("rpc.method", AttributeValue::String(md.name().into()));
85
86            Some(attributes)
87        } else {
88            None
89        }
90    }
91
92    pub(crate) fn get_measurements_and_instruments<'a>(
93        &self,
94        cfg: &'a aws_smithy_types::config_bag::ConfigBag,
95    ) -> (&'a MeasurementsContainer, &'a OperationTelemetry) {
96        let measurements = cfg
97            .load::<MeasurementsContainer>()
98            .expect("set in `read_before_execution`");
99
100        let instruments = cfg
101            .load::<OperationTelemetry>()
102            .expect("set in RuntimePlugin");
103
104        (measurements, instruments)
105    }
106}
107
108impl Intercept for MetricsInterceptor {
109    fn name(&self) -> &'static str {
110        "MetricsInterceptor"
111    }
112
113    fn read_before_execution(
114        &self,
115        _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>,
116        cfg: &mut aws_smithy_types::config_bag::ConfigBag,
117    ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
118        cfg.interceptor_state().store_put(MeasurementsContainer {
119            call_start: self.time_source.now(),
120            attempts: 0,
121            attempt_start: SystemTime::UNIX_EPOCH,
122        });
123
124        Ok(())
125    }
126
127    fn read_after_execution(
128        &self,
129        _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
130        _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
131        cfg: &mut aws_smithy_types::config_bag::ConfigBag,
132    ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
133        let (measurements, instruments) = self.get_measurements_and_instruments(cfg);
134
135        let attributes = self.get_attrs_from_cfg(cfg);
136
137        if let Some(attrs) = attributes {
138            let call_end = self.time_source.now();
139            let call_duration = call_end.duration_since(measurements.call_start);
140            if let Ok(elapsed) = call_duration {
141                instruments
142                    .operation_duration
143                    .record(elapsed.as_secs_f64(), Some(&attrs), None);
144            }
145        }
146
147        Ok(())
148    }
149
150    fn read_before_attempt(
151        &self,
152        _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextRef<'_>,
153        _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
154        cfg: &mut aws_smithy_types::config_bag::ConfigBag,
155    ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
156        let measurements = cfg
157            .get_mut::<MeasurementsContainer>()
158            .expect("set in `read_before_execution`");
159
160        measurements.attempts += 1;
161        measurements.attempt_start = self.time_source.now();
162
163        Ok(())
164    }
165
166    fn read_after_attempt(
167        &self,
168        _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
169        _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
170        cfg: &mut aws_smithy_types::config_bag::ConfigBag,
171    ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
172        let (measurements, instruments) = self.get_measurements_and_instruments(cfg);
173
174        let attempt_end = self.time_source.now();
175        let attempt_duration = attempt_end.duration_since(measurements.attempt_start);
176        let attributes = self.get_attrs_from_cfg(cfg);
177
178        if let (Ok(elapsed), Some(mut attrs)) = (attempt_duration, attributes) {
179            attrs.set("attempt", AttributeValue::I64(measurements.attempts.into()));
180
181            instruments
182                .attempt_duration
183                .record(elapsed.as_secs_f64(), Some(&attrs), None);
184        }
185        Ok(())
186    }
187}
188
189/// Runtime plugin that adds an interceptor for collecting metrics
190#[derive(Debug, Default)]
191pub struct MetricsRuntimePlugin {
192    scope: &'static str,
193    time_source: SharedTimeSource,
194    metadata: Option<Metadata>,
195}
196
197impl MetricsRuntimePlugin {
198    /// Create a [MetricsRuntimePluginBuilder]
199    pub fn builder() -> MetricsRuntimePluginBuilder {
200        MetricsRuntimePluginBuilder::default()
201    }
202}
203
204impl RuntimePlugin for MetricsRuntimePlugin {
205    fn runtime_components(
206        &self,
207        _current_components: &RuntimeComponentsBuilder,
208    ) -> Cow<'_, RuntimeComponentsBuilder> {
209        let interceptor = MetricsInterceptor::new(self.time_source.clone());
210        if let Ok(interceptor) = interceptor {
211            Cow::Owned(RuntimeComponentsBuilder::new("Metrics").with_interceptor(interceptor))
212        } else {
213            Cow::Owned(RuntimeComponentsBuilder::new("Metrics"))
214        }
215    }
216
217    fn config(&self) -> Option<FrozenLayer> {
218        let instruments = OperationTelemetry::new(self.scope);
219
220        if let Ok(instruments) = instruments {
221            let mut cfg = Layer::new("Metrics");
222            cfg.store_put(instruments);
223
224            if let Some(metadata) = &self.metadata {
225                cfg.store_put(metadata.clone());
226            }
227
228            Some(cfg.freeze())
229        } else {
230            None
231        }
232    }
233}
234
235/// Builder for [MetricsRuntimePlugin]
236#[derive(Debug, Default)]
237pub struct MetricsRuntimePluginBuilder {
238    scope: Option<&'static str>,
239    time_source: Option<SharedTimeSource>,
240    metadata: Option<Metadata>,
241}
242
243impl MetricsRuntimePluginBuilder {
244    /// Set the scope for the metrics
245    pub fn with_scope(mut self, scope: &'static str) -> Self {
246        self.scope = Some(scope);
247        self
248    }
249
250    /// Set the [TimeSource] for the metrics
251    pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
252        self.time_source = Some(SharedTimeSource::new(time_source));
253        self
254    }
255
256    /// Set the [Metadata] for the metrics.
257    ///
258    /// Note: the Metadata is optional, most operations set it themselves, but this is useful
259    /// for operations that do not, like some of the credential providers.
260    pub fn with_metadata(mut self, metadata: Metadata) -> Self {
261        self.metadata = Some(metadata);
262        self
263    }
264
265    /// Build a [MetricsRuntimePlugin]
266    pub fn build(
267        self,
268    ) -> Result<MetricsRuntimePlugin, aws_smithy_runtime_api::box_error::BoxError> {
269        if let Some(scope) = self.scope {
270            Ok(MetricsRuntimePlugin {
271                scope,
272                time_source: self.time_source.unwrap_or_default(),
273                metadata: self.metadata,
274            })
275        } else {
276            Err("Scope is required for MetricsRuntimePlugin.".into())
277        }
278    }
279}