aws_smithy_runtime/client/
defaults.rs1use crate::client::http::body::content_length_enforcement::EnforceContentLengthRuntimePlugin;
13use crate::client::identity::IdentityCache;
14use crate::client::retries::strategy::standard::TokenBucketProvider;
15use crate::client::retries::strategy::StandardRetryStrategy;
16use crate::client::retries::RetryPartition;
17use aws_smithy_async::rt::sleep::default_async_sleep;
18use aws_smithy_async::time::SystemTimeSource;
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
21use aws_smithy_runtime_api::client::http::SharedHttpClient;
22use aws_smithy_runtime_api::client::runtime_components::{
23 RuntimeComponentsBuilder, SharedConfigValidator,
24};
25use aws_smithy_runtime_api::client::runtime_plugin::{
26 Order, SharedRuntimePlugin, StaticRuntimePlugin,
27};
28use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
29use aws_smithy_runtime_api::shared::IntoShared;
30use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
31use aws_smithy_types::retry::RetryConfig;
32use aws_smithy_types::timeout::TimeoutConfig;
33use std::borrow::Cow;
34use std::time::Duration;
35
36fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
37where
38 CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
39{
40 StaticRuntimePlugin::new()
41 .with_order(Order::Defaults)
42 .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
43}
44
45fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
46where
47 LayerFn: FnOnce(&mut Layer),
48{
49 let mut layer = Layer::new(name);
50 (layer_fn)(&mut layer);
51 layer.freeze()
52}
53
54#[deprecated(
56 since = "1.8.0",
57 note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
58)]
59pub fn default_http_client_plugin() -> Option<SharedRuntimePlugin> {
60 #[allow(deprecated)]
61 default_http_client_plugin_v2(BehaviorVersion::v2024_03_28())
62}
63
64pub fn default_http_client_plugin_v2(
66 behavior_version: BehaviorVersion,
67) -> Option<SharedRuntimePlugin> {
68 let mut _default: Option<SharedHttpClient> = None;
69
70 if behavior_version.is_at_least(BehaviorVersion::v2025_01_17()) {
71 #[cfg(all(
75 feature = "connector-hyper-0-14-x",
76 not(feature = "default-https-client")
77 ))]
78 #[allow(deprecated)]
79 {
80 _default = crate::client::http::hyper_014::default_client();
81 }
82
83 #[cfg(feature = "default-https-client")]
85 {
86 let opts = crate::client::http::DefaultClientOptions::default()
87 .with_behavior_version(behavior_version);
88 _default = crate::client::http::default_https_client(opts);
89 }
90 } else {
91 #[cfg(feature = "connector-hyper-0-14-x")]
93 #[allow(deprecated)]
94 {
95 _default = crate::client::http::hyper_014::default_client();
96 }
97 }
98
99 _default.map(|default| {
100 default_plugin("default_http_client_plugin", |components| {
101 components.with_http_client(Some(default))
102 })
103 .into_shared()
104 })
105}
106
107pub fn default_sleep_impl_plugin() -> Option<SharedRuntimePlugin> {
109 default_async_sleep().map(|default| {
110 default_plugin("default_sleep_impl_plugin", |components| {
111 components.with_sleep_impl(Some(default))
112 })
113 .into_shared()
114 })
115}
116
117pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
119 Some(
120 default_plugin("default_time_source_plugin", |components| {
121 components.with_time_source(Some(SystemTimeSource::new()))
122 })
123 .into_shared(),
124 )
125}
126
127pub fn default_retry_config_plugin(
129 default_partition_name: impl Into<Cow<'static, str>>,
130) -> Option<SharedRuntimePlugin> {
131 let retry_partition = RetryPartition::new(default_partition_name);
132 Some(
133 default_plugin("default_retry_config_plugin", |components| {
134 components
135 .with_retry_strategy(Some(StandardRetryStrategy::new()))
136 .with_config_validator(SharedConfigValidator::base_client_config_fn(
137 validate_retry_config,
138 ))
139 .with_interceptor(TokenBucketProvider::new(retry_partition.clone()))
140 })
141 .with_config(layer("default_retry_config", |layer| {
142 layer.store_put(RetryConfig::disabled());
143 layer.store_put(retry_partition);
144 }))
145 .into_shared(),
146 )
147}
148
149fn validate_retry_config(
150 components: &RuntimeComponentsBuilder,
151 cfg: &ConfigBag,
152) -> Result<(), BoxError> {
153 if let Some(retry_config) = cfg.load::<RetryConfig>() {
154 if retry_config.has_retry() && components.sleep_impl().is_none() {
155 Err("An async sleep implementation is required for retry to work. Please provide a `sleep_impl` on \
156 the config, or disable timeouts.".into())
157 } else {
158 Ok(())
159 }
160 } else {
161 Err(
162 "The default retry config was removed, and no other config was put in its place."
163 .into(),
164 )
165 }
166}
167
168pub fn default_timeout_config_plugin() -> Option<SharedRuntimePlugin> {
170 Some(
171 default_plugin("default_timeout_config_plugin", |components| {
172 components.with_config_validator(SharedConfigValidator::base_client_config_fn(
173 validate_timeout_config,
174 ))
175 })
176 .with_config(layer("default_timeout_config", |layer| {
177 layer.store_put(TimeoutConfig::disabled());
178 }))
179 .into_shared(),
180 )
181}
182
183fn validate_timeout_config(
184 components: &RuntimeComponentsBuilder,
185 cfg: &ConfigBag,
186) -> Result<(), BoxError> {
187 if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
188 if timeout_config.has_timeouts() && components.sleep_impl().is_none() {
189 Err("An async sleep implementation is required for timeouts to work. Please provide a `sleep_impl` on \
190 the config, or disable timeouts.".into())
191 } else {
192 Ok(())
193 }
194 } else {
195 Err(
196 "The default timeout config was removed, and no other config was put in its place."
197 .into(),
198 )
199 }
200}
201
202pub fn default_identity_cache_plugin() -> Option<SharedRuntimePlugin> {
204 Some(
205 default_plugin("default_identity_cache_plugin", |components| {
206 components.with_identity_cache(Some(IdentityCache::lazy().build()))
207 })
208 .into_shared(),
209 )
210}
211
212#[deprecated(
217 since = "1.2.0",
218 note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
219)]
220pub fn default_stalled_stream_protection_config_plugin() -> Option<SharedRuntimePlugin> {
221 #[allow(deprecated)]
222 default_stalled_stream_protection_config_plugin_v2(BehaviorVersion::v2023_11_09())
223}
224fn default_stalled_stream_protection_config_plugin_v2(
225 behavior_version: BehaviorVersion,
226) -> Option<SharedRuntimePlugin> {
227 Some(
228 default_plugin(
229 "default_stalled_stream_protection_config_plugin",
230 |components| {
231 components.with_config_validator(SharedConfigValidator::base_client_config_fn(
232 validate_stalled_stream_protection_config,
233 ))
234 },
235 )
236 .with_config(layer("default_stalled_stream_protection_config", |layer| {
237 let mut config =
238 StalledStreamProtectionConfig::enabled().grace_period(Duration::from_secs(5));
239 #[allow(deprecated)]
241 if !behavior_version.is_at_least(BehaviorVersion::v2024_03_28()) {
242 config = config.upload_enabled(false);
243 }
244 layer.store_put(config.build());
245 }))
246 .into_shared(),
247 )
248}
249
250fn enforce_content_length_runtime_plugin() -> Option<SharedRuntimePlugin> {
251 Some(EnforceContentLengthRuntimePlugin::new().into_shared())
252}
253
254fn validate_stalled_stream_protection_config(
255 components: &RuntimeComponentsBuilder,
256 cfg: &ConfigBag,
257) -> Result<(), BoxError> {
258 if let Some(stalled_stream_protection_config) = cfg.load::<StalledStreamProtectionConfig>() {
259 if stalled_stream_protection_config.is_enabled() {
260 if components.sleep_impl().is_none() {
261 return Err(
262 "An async sleep implementation is required for stalled stream protection to work. \
263 Please provide a `sleep_impl` on the config, or disable stalled stream protection.".into());
264 }
265
266 if components.time_source().is_none() {
267 return Err(
268 "A time source is required for stalled stream protection to work.\
269 Please provide a `time_source` on the config, or disable stalled stream protection.".into());
270 }
271 }
272
273 Ok(())
274 } else {
275 Err(
276 "The default stalled stream protection config was removed, and no other config was put in its place."
277 .into(),
278 )
279 }
280}
281
282#[non_exhaustive]
286#[derive(Debug, Default)]
287pub struct DefaultPluginParams {
288 retry_partition_name: Option<Cow<'static, str>>,
289 behavior_version: Option<BehaviorVersion>,
290}
291
292impl DefaultPluginParams {
293 pub fn new() -> Self {
295 Default::default()
296 }
297
298 pub fn with_retry_partition_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
300 self.retry_partition_name = Some(name.into());
301 self
302 }
303
304 pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
306 self.behavior_version = Some(version);
307 self
308 }
309}
310
311pub fn default_plugins(
313 params: DefaultPluginParams,
314) -> impl IntoIterator<Item = SharedRuntimePlugin> {
315 let behavior_version = params
316 .behavior_version
317 .unwrap_or_else(BehaviorVersion::latest);
318
319 [
320 default_http_client_plugin_v2(behavior_version),
321 default_identity_cache_plugin(),
322 default_retry_config_plugin(
323 params
324 .retry_partition_name
325 .expect("retry_partition_name is required"),
326 ),
327 default_sleep_impl_plugin(),
328 default_time_source_plugin(),
329 default_timeout_config_plugin(),
330 enforce_content_length_runtime_plugin(),
331 default_stalled_stream_protection_config_plugin_v2(behavior_version),
332 ]
333 .into_iter()
334 .flatten()
335 .collect::<Vec<SharedRuntimePlugin>>()
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
342
343 fn test_plugin_params(version: BehaviorVersion) -> DefaultPluginParams {
344 DefaultPluginParams::new()
345 .with_behavior_version(version)
346 .with_retry_partition_name("dontcare")
347 }
348 fn config_for(plugins: impl IntoIterator<Item = SharedRuntimePlugin>) -> ConfigBag {
349 let mut config = ConfigBag::base();
350 let plugins = RuntimePlugins::new().with_client_plugins(plugins);
351 plugins.apply_client_configuration(&mut config).unwrap();
352 config
353 }
354
355 #[test]
356 #[allow(deprecated)]
357 fn v2024_03_28_stalled_stream_protection_difference() {
358 let latest = config_for(default_plugins(test_plugin_params(
359 BehaviorVersion::latest(),
360 )));
361 let v2023 = config_for(default_plugins(test_plugin_params(
362 BehaviorVersion::v2023_11_09(),
363 )));
364
365 assert!(
366 latest
367 .load::<StalledStreamProtectionConfig>()
368 .unwrap()
369 .upload_enabled(),
370 "stalled stream protection on uploads MUST be enabled after v2024_03_28"
371 );
372 assert!(
373 !v2023
374 .load::<StalledStreamProtectionConfig>()
375 .unwrap()
376 .upload_enabled(),
377 "stalled stream protection on uploads MUST NOT be enabled before v2024_03_28"
378 );
379 }
380}