aws_smithy_runtime/client/
defaults.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Runtime plugins that provide defaults for clients.
7//!
8//! Note: these are the absolute base-level defaults. They may not be the defaults
9//! for _your_ client, since many things can change these defaults on the way to
10//! code generating and constructing a full client.
11
12use crate::client::http::body::content_length_enforcement::EnforceContentLengthRuntimePlugin;
13use crate::client::identity::IdentityCache;
14use crate::client::retries::strategy::standard::TokenBucketProvider;
15use crate::client::retries::strategy::StandardRetryStrategy;
16use crate::client::retries::RetryPartition;
17use aws_smithy_async::rt::sleep::default_async_sleep;
18use aws_smithy_async::time::SystemTimeSource;
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
21use aws_smithy_runtime_api::client::http::SharedHttpClient;
22use aws_smithy_runtime_api::client::runtime_components::{
23    RuntimeComponentsBuilder, SharedConfigValidator,
24};
25use aws_smithy_runtime_api::client::runtime_plugin::{
26    Order, SharedRuntimePlugin, StaticRuntimePlugin,
27};
28use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
29use aws_smithy_runtime_api::shared::IntoShared;
30use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
31use aws_smithy_types::retry::RetryConfig;
32use aws_smithy_types::timeout::TimeoutConfig;
33use std::borrow::Cow;
34use std::time::Duration;
35
36fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
37where
38    CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
39{
40    StaticRuntimePlugin::new()
41        .with_order(Order::Defaults)
42        .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
43}
44
45fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
46where
47    LayerFn: FnOnce(&mut Layer),
48{
49    let mut layer = Layer::new(name);
50    (layer_fn)(&mut layer);
51    layer.freeze()
52}
53
54/// Runtime plugin that provides a default connector.
55#[deprecated(
56    since = "1.8.0",
57    note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
58)]
59pub fn default_http_client_plugin() -> Option<SharedRuntimePlugin> {
60    #[allow(deprecated)]
61    default_http_client_plugin_v2(BehaviorVersion::v2024_03_28())
62}
63
64/// Runtime plugin that provides a default HTTPS connector.
65pub fn default_http_client_plugin_v2(
66    behavior_version: BehaviorVersion,
67) -> Option<SharedRuntimePlugin> {
68    let mut _default: Option<SharedHttpClient> = None;
69
70    if behavior_version.is_at_least(BehaviorVersion::v2025_01_17()) {
71        // the latest https stack takes precedence if the config flag
72        // is enabled otherwise try to fall back to the legacy connector
73        // if that feature flag is available.
74        #[cfg(all(
75            feature = "connector-hyper-0-14-x",
76            not(feature = "default-https-client")
77        ))]
78        #[allow(deprecated)]
79        {
80            _default = crate::client::http::hyper_014::default_client();
81        }
82
83        // takes precedence over legacy connector if enabled
84        #[cfg(feature = "default-https-client")]
85        {
86            let opts = crate::client::http::DefaultClientOptions::default()
87                .with_behavior_version(behavior_version);
88            _default = crate::client::http::default_https_client(opts);
89        }
90    } else {
91        // fallback to legacy hyper client for given behavior version
92        #[cfg(feature = "connector-hyper-0-14-x")]
93        #[allow(deprecated)]
94        {
95            _default = crate::client::http::hyper_014::default_client();
96        }
97    }
98
99    _default.map(|default| {
100        default_plugin("default_http_client_plugin", |components| {
101            components.with_http_client(Some(default))
102        })
103        .into_shared()
104    })
105}
106
107/// Runtime plugin that provides a default async sleep implementation.
108pub fn default_sleep_impl_plugin() -> Option<SharedRuntimePlugin> {
109    default_async_sleep().map(|default| {
110        default_plugin("default_sleep_impl_plugin", |components| {
111            components.with_sleep_impl(Some(default))
112        })
113        .into_shared()
114    })
115}
116
117/// Runtime plugin that provides a default time source.
118pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
119    Some(
120        default_plugin("default_time_source_plugin", |components| {
121            components.with_time_source(Some(SystemTimeSource::new()))
122        })
123        .into_shared(),
124    )
125}
126
127/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
128pub fn default_retry_config_plugin(
129    default_partition_name: impl Into<Cow<'static, str>>,
130) -> Option<SharedRuntimePlugin> {
131    let retry_partition = RetryPartition::new(default_partition_name);
132    Some(
133        default_plugin("default_retry_config_plugin", |components| {
134            components
135                .with_retry_strategy(Some(StandardRetryStrategy::new()))
136                .with_config_validator(SharedConfigValidator::base_client_config_fn(
137                    validate_retry_config,
138                ))
139                .with_interceptor(TokenBucketProvider::new(retry_partition.clone()))
140        })
141        .with_config(layer("default_retry_config", |layer| {
142            layer.store_put(RetryConfig::disabled());
143            layer.store_put(retry_partition);
144        }))
145        .into_shared(),
146    )
147}
148
149fn validate_retry_config(
150    components: &RuntimeComponentsBuilder,
151    cfg: &ConfigBag,
152) -> Result<(), BoxError> {
153    if let Some(retry_config) = cfg.load::<RetryConfig>() {
154        if retry_config.has_retry() && components.sleep_impl().is_none() {
155            Err("An async sleep implementation is required for retry to work. Please provide a `sleep_impl` on \
156                 the config, or disable timeouts.".into())
157        } else {
158            Ok(())
159        }
160    } else {
161        Err(
162            "The default retry config was removed, and no other config was put in its place."
163                .into(),
164        )
165    }
166}
167
168/// Runtime plugin that sets the default timeout config (no timeouts).
169pub fn default_timeout_config_plugin() -> Option<SharedRuntimePlugin> {
170    Some(
171        default_plugin("default_timeout_config_plugin", |components| {
172            components.with_config_validator(SharedConfigValidator::base_client_config_fn(
173                validate_timeout_config,
174            ))
175        })
176        .with_config(layer("default_timeout_config", |layer| {
177            layer.store_put(TimeoutConfig::disabled());
178        }))
179        .into_shared(),
180    )
181}
182
183fn validate_timeout_config(
184    components: &RuntimeComponentsBuilder,
185    cfg: &ConfigBag,
186) -> Result<(), BoxError> {
187    if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
188        if timeout_config.has_timeouts() && components.sleep_impl().is_none() {
189            Err("An async sleep implementation is required for timeouts to work. Please provide a `sleep_impl` on \
190                 the config, or disable timeouts.".into())
191        } else {
192            Ok(())
193        }
194    } else {
195        Err(
196            "The default timeout config was removed, and no other config was put in its place."
197                .into(),
198        )
199    }
200}
201
202/// Runtime plugin that registers the default identity cache implementation.
203pub fn default_identity_cache_plugin() -> Option<SharedRuntimePlugin> {
204    Some(
205        default_plugin("default_identity_cache_plugin", |components| {
206            components.with_identity_cache(Some(IdentityCache::lazy().build()))
207        })
208        .into_shared(),
209    )
210}
211
212/// Runtime plugin that sets the default stalled stream protection config.
213///
214/// By default, when throughput falls below 1/Bs for more than 5 seconds, the
215/// stream is cancelled.
216#[deprecated(
217    since = "1.2.0",
218    note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
219)]
220pub fn default_stalled_stream_protection_config_plugin() -> Option<SharedRuntimePlugin> {
221    #[allow(deprecated)]
222    default_stalled_stream_protection_config_plugin_v2(BehaviorVersion::v2023_11_09())
223}
224fn default_stalled_stream_protection_config_plugin_v2(
225    behavior_version: BehaviorVersion,
226) -> Option<SharedRuntimePlugin> {
227    Some(
228        default_plugin(
229            "default_stalled_stream_protection_config_plugin",
230            |components| {
231                components.with_config_validator(SharedConfigValidator::base_client_config_fn(
232                    validate_stalled_stream_protection_config,
233                ))
234            },
235        )
236        .with_config(layer("default_stalled_stream_protection_config", |layer| {
237            let mut config =
238                StalledStreamProtectionConfig::enabled().grace_period(Duration::from_secs(5));
239            // Before v2024_03_28, upload streams did not have stalled stream protection by default
240            #[allow(deprecated)]
241            if !behavior_version.is_at_least(BehaviorVersion::v2024_03_28()) {
242                config = config.upload_enabled(false);
243            }
244            layer.store_put(config.build());
245        }))
246        .into_shared(),
247    )
248}
249
250fn enforce_content_length_runtime_plugin() -> Option<SharedRuntimePlugin> {
251    Some(EnforceContentLengthRuntimePlugin::new().into_shared())
252}
253
254fn validate_stalled_stream_protection_config(
255    components: &RuntimeComponentsBuilder,
256    cfg: &ConfigBag,
257) -> Result<(), BoxError> {
258    if let Some(stalled_stream_protection_config) = cfg.load::<StalledStreamProtectionConfig>() {
259        if stalled_stream_protection_config.is_enabled() {
260            if components.sleep_impl().is_none() {
261                return Err(
262                    "An async sleep implementation is required for stalled stream protection to work. \
263                     Please provide a `sleep_impl` on the config, or disable stalled stream protection.".into());
264            }
265
266            if components.time_source().is_none() {
267                return Err(
268                    "A time source is required for stalled stream protection to work.\
269                     Please provide a `time_source` on the config, or disable stalled stream protection.".into());
270            }
271        }
272
273        Ok(())
274    } else {
275        Err(
276            "The default stalled stream protection config was removed, and no other config was put in its place."
277                .into(),
278        )
279    }
280}
281
282/// Arguments for the [`default_plugins`] method.
283///
284/// This is a struct to enable adding new parameters in the future without breaking the API.
285#[non_exhaustive]
286#[derive(Debug, Default)]
287pub struct DefaultPluginParams {
288    retry_partition_name: Option<Cow<'static, str>>,
289    behavior_version: Option<BehaviorVersion>,
290}
291
292impl DefaultPluginParams {
293    /// Creates a new [`DefaultPluginParams`].
294    pub fn new() -> Self {
295        Default::default()
296    }
297
298    /// Sets the retry partition name.
299    pub fn with_retry_partition_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
300        self.retry_partition_name = Some(name.into());
301        self
302    }
303
304    /// Sets the behavior major version.
305    pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
306        self.behavior_version = Some(version);
307        self
308    }
309}
310
311/// All default plugins.
312pub fn default_plugins(
313    params: DefaultPluginParams,
314) -> impl IntoIterator<Item = SharedRuntimePlugin> {
315    let behavior_version = params
316        .behavior_version
317        .unwrap_or_else(BehaviorVersion::latest);
318
319    [
320        default_http_client_plugin_v2(behavior_version),
321        default_identity_cache_plugin(),
322        default_retry_config_plugin(
323            params
324                .retry_partition_name
325                .expect("retry_partition_name is required"),
326        ),
327        default_sleep_impl_plugin(),
328        default_time_source_plugin(),
329        default_timeout_config_plugin(),
330        enforce_content_length_runtime_plugin(),
331        default_stalled_stream_protection_config_plugin_v2(behavior_version),
332    ]
333    .into_iter()
334    .flatten()
335    .collect::<Vec<SharedRuntimePlugin>>()
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
342
343    fn test_plugin_params(version: BehaviorVersion) -> DefaultPluginParams {
344        DefaultPluginParams::new()
345            .with_behavior_version(version)
346            .with_retry_partition_name("dontcare")
347    }
348    fn config_for(plugins: impl IntoIterator<Item = SharedRuntimePlugin>) -> ConfigBag {
349        let mut config = ConfigBag::base();
350        let plugins = RuntimePlugins::new().with_client_plugins(plugins);
351        plugins.apply_client_configuration(&mut config).unwrap();
352        config
353    }
354
355    #[test]
356    #[allow(deprecated)]
357    fn v2024_03_28_stalled_stream_protection_difference() {
358        let latest = config_for(default_plugins(test_plugin_params(
359            BehaviorVersion::latest(),
360        )));
361        let v2023 = config_for(default_plugins(test_plugin_params(
362            BehaviorVersion::v2023_11_09(),
363        )));
364
365        assert!(
366            latest
367                .load::<StalledStreamProtectionConfig>()
368                .unwrap()
369                .upload_enabled(),
370            "stalled stream protection on uploads MUST be enabled after v2024_03_28"
371        );
372        assert!(
373            !v2023
374                .load::<StalledStreamProtectionConfig>()
375                .unwrap()
376                .upload_enabled(),
377            "stalled stream protection on uploads MUST NOT be enabled before v2024_03_28"
378        );
379    }
380}