aws_smithy_runtime/client/
metrics.rs1use aws_smithy_async::time::{SharedTimeSource, TimeSource};
7use aws_smithy_observability::{
8 global::get_telemetry_provider, instruments::Histogram, AttributeValue, Attributes,
9 ObservabilityError,
10};
11use aws_smithy_runtime_api::client::{
12 interceptors::Intercept, orchestrator::Metadata, runtime_components::RuntimeComponentsBuilder,
13 runtime_plugin::RuntimePlugin,
14};
15use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace};
16use std::{borrow::Cow, sync::Arc, time::SystemTime};
17
18#[derive(Debug, Clone)]
20pub(crate) struct MeasurementsContainer {
21 call_start: SystemTime,
22 attempts: u32,
23 attempt_start: SystemTime,
24}
25
26impl Storable for MeasurementsContainer {
27 type Storer = StoreReplace<Self>;
28}
29
30#[derive(Debug, Clone)]
32pub(crate) struct OperationTelemetry {
33 pub(crate) operation_duration: Arc<dyn Histogram>,
34 pub(crate) attempt_duration: Arc<dyn Histogram>,
35}
36
37impl OperationTelemetry {
38 pub(crate) fn new(scope: &'static str) -> Result<Self, ObservabilityError> {
39 let meter = get_telemetry_provider()?
40 .meter_provider()
41 .get_meter(scope, None);
42
43 Ok(Self{
44 operation_duration: meter
45 .create_histogram("smithy.client.call.duration")
46 .set_units("s")
47 .set_description("Overall call duration (including retries and time to send or receive request and response body)")
48 .build(),
49 attempt_duration: meter
50 .create_histogram("smithy.client.call.attempt.duration")
51 .set_units("s")
52 .set_description("The time it takes to connect to the service, send the request, and get back HTTP status code and headers (including time queued waiting to be sent)")
53 .build(),
54 })
55 }
56}
57
58impl Storable for OperationTelemetry {
59 type Storer = StoreReplace<Self>;
60}
61
62#[derive(Debug)]
63pub(crate) struct MetricsInterceptor {
64 time_source: SharedTimeSource,
68}
69
70impl MetricsInterceptor {
71 pub(crate) fn new(time_source: SharedTimeSource) -> Result<Self, ObservabilityError> {
72 Ok(MetricsInterceptor { time_source })
73 }
74
75 pub(crate) fn get_attrs_from_cfg(
76 &self,
77 cfg: &aws_smithy_types::config_bag::ConfigBag,
78 ) -> Option<Attributes> {
79 let operation_metadata = cfg.load::<Metadata>();
80
81 if let Some(md) = operation_metadata {
82 let mut attributes = Attributes::new();
83 attributes.set("rpc.service", AttributeValue::String(md.service().into()));
84 attributes.set("rpc.method", AttributeValue::String(md.name().into()));
85
86 Some(attributes)
87 } else {
88 None
89 }
90 }
91
92 pub(crate) fn get_measurements_and_instruments<'a>(
93 &self,
94 cfg: &'a aws_smithy_types::config_bag::ConfigBag,
95 ) -> (&'a MeasurementsContainer, &'a OperationTelemetry) {
96 let measurements = cfg
97 .load::<MeasurementsContainer>()
98 .expect("set in `read_before_execution`");
99
100 let instruments = cfg
101 .load::<OperationTelemetry>()
102 .expect("set in RuntimePlugin");
103
104 (measurements, instruments)
105 }
106}
107
108impl Intercept for MetricsInterceptor {
109 fn name(&self) -> &'static str {
110 "MetricsInterceptor"
111 }
112
113 fn read_before_execution(
114 &self,
115 _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>,
116 cfg: &mut aws_smithy_types::config_bag::ConfigBag,
117 ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
118 cfg.interceptor_state().store_put(MeasurementsContainer {
119 call_start: self.time_source.now(),
120 attempts: 0,
121 attempt_start: SystemTime::UNIX_EPOCH,
122 });
123
124 Ok(())
125 }
126
127 fn read_after_execution(
128 &self,
129 _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
130 _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
131 cfg: &mut aws_smithy_types::config_bag::ConfigBag,
132 ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
133 let (measurements, instruments) = self.get_measurements_and_instruments(cfg);
134
135 let attributes = self.get_attrs_from_cfg(cfg);
136
137 if let Some(attrs) = attributes {
138 let call_end = self.time_source.now();
139 let call_duration = call_end.duration_since(measurements.call_start);
140 if let Ok(elapsed) = call_duration {
141 instruments
142 .operation_duration
143 .record(elapsed.as_secs_f64(), Some(&attrs), None);
144 }
145 }
146
147 Ok(())
148 }
149
150 fn read_before_attempt(
151 &self,
152 _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextRef<'_>,
153 _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
154 cfg: &mut aws_smithy_types::config_bag::ConfigBag,
155 ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
156 let measurements = cfg
157 .get_mut::<MeasurementsContainer>()
158 .expect("set in `read_before_execution`");
159
160 measurements.attempts += 1;
161 measurements.attempt_start = self.time_source.now();
162
163 Ok(())
164 }
165
166 fn read_after_attempt(
167 &self,
168 _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
169 _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
170 cfg: &mut aws_smithy_types::config_bag::ConfigBag,
171 ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
172 let (measurements, instruments) = self.get_measurements_and_instruments(cfg);
173
174 let attempt_end = self.time_source.now();
175 let attempt_duration = attempt_end.duration_since(measurements.attempt_start);
176 let attributes = self.get_attrs_from_cfg(cfg);
177
178 if let (Ok(elapsed), Some(mut attrs)) = (attempt_duration, attributes) {
179 attrs.set("attempt", AttributeValue::I64(measurements.attempts.into()));
180
181 instruments
182 .attempt_duration
183 .record(elapsed.as_secs_f64(), Some(&attrs), None);
184 }
185 Ok(())
186 }
187}
188
189#[derive(Debug, Default)]
191pub struct MetricsRuntimePlugin {
192 scope: &'static str,
193 time_source: SharedTimeSource,
194 metadata: Option<Metadata>,
195}
196
197impl MetricsRuntimePlugin {
198 pub fn builder() -> MetricsRuntimePluginBuilder {
200 MetricsRuntimePluginBuilder::default()
201 }
202}
203
204impl RuntimePlugin for MetricsRuntimePlugin {
205 fn runtime_components(
206 &self,
207 _current_components: &RuntimeComponentsBuilder,
208 ) -> Cow<'_, RuntimeComponentsBuilder> {
209 let interceptor = MetricsInterceptor::new(self.time_source.clone());
210 if let Ok(interceptor) = interceptor {
211 Cow::Owned(RuntimeComponentsBuilder::new("Metrics").with_interceptor(interceptor))
212 } else {
213 Cow::Owned(RuntimeComponentsBuilder::new("Metrics"))
214 }
215 }
216
217 fn config(&self) -> Option<FrozenLayer> {
218 let instruments = OperationTelemetry::new(self.scope);
219
220 if let Ok(instruments) = instruments {
221 let mut cfg = Layer::new("Metrics");
222 cfg.store_put(instruments);
223
224 if let Some(metadata) = &self.metadata {
225 cfg.store_put(metadata.clone());
226 }
227
228 Some(cfg.freeze())
229 } else {
230 None
231 }
232 }
233}
234
235#[derive(Debug, Default)]
237pub struct MetricsRuntimePluginBuilder {
238 scope: Option<&'static str>,
239 time_source: Option<SharedTimeSource>,
240 metadata: Option<Metadata>,
241}
242
243impl MetricsRuntimePluginBuilder {
244 pub fn with_scope(mut self, scope: &'static str) -> Self {
246 self.scope = Some(scope);
247 self
248 }
249
250 pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
252 self.time_source = Some(SharedTimeSource::new(time_source));
253 self
254 }
255
256 pub fn with_metadata(mut self, metadata: Metadata) -> Self {
261 self.metadata = Some(metadata);
262 self
263 }
264
265 pub fn build(
267 self,
268 ) -> Result<MetricsRuntimePlugin, aws_smithy_runtime_api::box_error::BoxError> {
269 if let Some(scope) = self.scope {
270 Ok(MetricsRuntimePlugin {
271 scope,
272 time_source: self.time_source.unwrap_or_default(),
273 metadata: self.metadata,
274 })
275 } else {
276 Err("Scope is required for MetricsRuntimePlugin.".into())
277 }
278 }
279}