aws_sdk_s3/
s3_express.rs

1// Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
2/*
3 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7/// Supporting code for S3 Express auth
8pub(crate) mod auth {
9    use aws_runtime::auth::sigv4::SigV4Signer;
10    use aws_smithy_runtime_api::client::auth::{AuthScheme, AuthSchemeId, Sign};
11    use aws_smithy_runtime_api::client::identity::SharedIdentityResolver;
12    use aws_smithy_runtime_api::client::runtime_components::GetIdentityResolver;
13
14    /// Auth scheme ID for S3 Express.
15    pub(crate) const SCHEME_ID: AuthSchemeId = AuthSchemeId::new("sigv4-s3express");
16
17    /// S3 Express auth scheme.
18    #[derive(Debug, Default)]
19    pub(crate) struct S3ExpressAuthScheme {
20        signer: SigV4Signer,
21    }
22
23    impl S3ExpressAuthScheme {
24        /// Creates a new `S3ExpressAuthScheme`.
25        pub(crate) fn new() -> Self {
26            Default::default()
27        }
28    }
29
30    impl AuthScheme for S3ExpressAuthScheme {
31        fn scheme_id(&self) -> AuthSchemeId {
32            SCHEME_ID
33        }
34
35        fn identity_resolver(&self, identity_resolvers: &dyn GetIdentityResolver) -> Option<SharedIdentityResolver> {
36            identity_resolvers.identity_resolver(self.scheme_id())
37        }
38
39        fn signer(&self) -> &dyn Sign {
40            &self.signer
41        }
42    }
43}
44
45/// Supporting code for S3 Express identity cache
46pub(crate) mod identity_cache {
47    use aws_credential_types::Credentials;
48    use aws_smithy_async::time::SharedTimeSource;
49    use aws_smithy_runtime::expiring_cache::ExpiringCache;
50    use aws_smithy_runtime_api::box_error::BoxError;
51    use aws_smithy_runtime_api::client::identity::Identity;
52    use aws_smithy_types::DateTime;
53    use fastrand::Rng;
54    use hmac::{digest::FixedOutput, Hmac, Mac};
55    use lru::LruCache;
56    use sha2::Sha256;
57    use std::fmt;
58    use std::future::Future;
59    use std::hash::Hash;
60    use std::num::NonZeroUsize;
61    use std::sync::Mutex;
62    use std::time::{Duration, SystemTime};
63
64    pub(crate) const DEFAULT_MAX_CACHE_CAPACITY: usize = 100;
65    pub(crate) const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10);
66
67    #[derive(Clone, Eq, PartialEq, Hash)]
68    pub(crate) struct CacheKey(String);
69
70    /// The caching implementation for S3 Express identity.
71    ///
72    /// While customers can either disable S3 Express itself or provide a custom S3 Express identity
73    /// provider, configuring S3 Express identity cache is not supported. Thus, this is _the_
74    /// implementation of S3 Express identity cache.
75    pub(crate) struct S3ExpressIdentityCache {
76        inner: Mutex<LruCache<CacheKey, ExpiringCache<Identity, BoxError>>>,
77        time_source: SharedTimeSource,
78        buffer_time: Duration,
79        random_bytes: [u8; 64],
80    }
81
82    impl fmt::Debug for S3ExpressIdentityCache {
83        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84            let (size, capacity) = {
85                let cache = self.inner.lock().unwrap();
86                (cache.len(), cache.cap())
87            };
88            write!(
89                f,
90                "S3ExpressIdentityCache {{ time_source: {:?}, buffer_time: {:?} }}, with size/capacity: {}/{}",
91                self.time_source, &self.buffer_time, size, capacity,
92            )
93        }
94    }
95
96    impl S3ExpressIdentityCache {
97        pub(crate) fn new(capacity: usize, time_source: SharedTimeSource, buffer_time: Duration) -> Self {
98            // It'd be nice to use a cryptographically secure random generator but not necessary.
99            // The cache is memory only and randomization here is mostly to obfuscate the key and
100            // make it reasonable length.
101            let mut rng = Rng::default();
102            let mut random_bytes = [0u8; 64];
103            rng.fill(&mut random_bytes);
104            Self {
105                inner: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
106                time_source,
107                buffer_time,
108                random_bytes,
109            }
110        }
111
112        pub(crate) fn key(&self, bucket_name: &str, creds: &Credentials) -> CacheKey {
113            CacheKey({
114                let mut mac = Hmac::<Sha256>::new_from_slice(self.random_bytes.as_slice()).expect("should be created from random 64 bytes");
115                let input = format!("{}{}", creds.access_key_id(), creds.secret_access_key());
116                mac.update(input.as_ref());
117                let mut inner = hex::encode(mac.finalize_fixed());
118                inner.push_str(bucket_name);
119                inner
120            })
121        }
122
123        pub(crate) async fn get_or_load<F, Fut>(&self, key: CacheKey, loader: F) -> Result<Identity, BoxError>
124        where
125            F: FnOnce() -> Fut,
126            Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
127        {
128            let expiring_cache = {
129                let mut inner = self.inner.lock().unwrap();
130                inner.get_or_insert_mut(key, || ExpiringCache::new(self.buffer_time)).clone()
131            };
132
133            let now = self.time_source.now();
134
135            match expiring_cache.yield_or_clear_if_expired(now).await {
136                Some(identity) => {
137                    tracing::debug!(
138                        buffer_time=?self.buffer_time,
139                        cached_expiration=?identity.expiration(),
140                        now=?now,
141                        "loaded identity from cache"
142                    );
143                    Ok(identity)
144                }
145                None => {
146                    let start_time = self.time_source.now();
147                    let identity = expiring_cache.get_or_load(loader).await?;
148                    let expiration = identity.expiration().ok_or("SessionCredentials` always has expiration")?;
149                    let printable = DateTime::from(expiration);
150                    tracing::info!(
151                        new_expiration=%printable,
152                        valid_for=?expiration.duration_since(self.time_source.now()).unwrap_or_default(),
153                        "identity cache miss occurred; added new identity (took {:?})",
154                        self.time_source.now().duration_since(start_time).unwrap_or_default()
155                    );
156                    Ok(identity)
157                }
158            }
159        }
160    }
161
162    #[cfg(test)]
163    mod tests {
164        use super::*;
165        use aws_smithy_async::rt::sleep::TokioSleep;
166        use aws_smithy_async::test_util::ManualTimeSource;
167        use aws_smithy_runtime_api::client::identity::http::Token;
168        use aws_smithy_runtime_api::client::identity::{IdentityFuture, ResolveIdentity, SharedIdentityResolver};
169        use aws_smithy_runtime_api::client::runtime_components::{RuntimeComponents, RuntimeComponentsBuilder};
170        use aws_smithy_runtime_api::shared::IntoShared;
171        use aws_smithy_types::config_bag::ConfigBag;
172        use futures_util::stream::FuturesUnordered;
173        use std::sync::Arc;
174        use std::time::{Duration, SystemTime, UNIX_EPOCH};
175        use tracing::info;
176
177        fn epoch_secs(secs: u64) -> SystemTime {
178            SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
179        }
180
181        fn identity_expiring_in(expired_secs: u64) -> Identity {
182            let expiration = Some(epoch_secs(expired_secs));
183            Identity::new(Token::new("test", expiration), expiration)
184        }
185
186        fn test_identity_resolver(load_list: Vec<Result<Identity, BoxError>>) -> SharedIdentityResolver {
187            #[derive(Debug)]
188            struct Resolver(Mutex<Vec<Result<Identity, BoxError>>>);
189            impl ResolveIdentity for Resolver {
190                fn resolve_identity<'a>(&'a self, _: &'a RuntimeComponents, _config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
191                    let mut list = self.0.lock().unwrap();
192                    if list.len() > 0 {
193                        let next = list.remove(0);
194                        info!("refreshing the identity to {:?}", next);
195                        IdentityFuture::ready(next)
196                    } else {
197                        drop(list);
198                        panic!("no more identities")
199                    }
200                }
201            }
202
203            SharedIdentityResolver::new(Resolver(Mutex::new(load_list)))
204        }
205
206        async fn load(identity_resolver: SharedIdentityResolver, runtime_components: &RuntimeComponents) -> Result<(Identity, SystemTime), BoxError> {
207            let identity = identity_resolver.resolve_identity(&runtime_components, &ConfigBag::base()).await.unwrap();
208            Ok((identity.clone(), identity.expiration().unwrap()))
209        }
210
211        async fn expect_identity<F, Fut>(expired_secs: u64, sut: &S3ExpressIdentityCache, key: CacheKey, loader: F)
212        where
213            F: FnOnce() -> Fut,
214            Fut: Future<Output = Result<(Identity, SystemTime), BoxError>>,
215        {
216            let identity = sut.get_or_load(key, loader).await.unwrap();
217            assert_eq!(Some(epoch_secs(expired_secs)), identity.expiration());
218        }
219
220        #[tokio::test]
221        async fn reload_expired_test_identity() {
222            let time = ManualTimeSource::new(UNIX_EPOCH);
223            let runtime_components = RuntimeComponentsBuilder::for_tests()
224                .with_time_source(Some(time.clone()))
225                .with_sleep_impl(Some(TokioSleep::new()))
226                .build()
227                .unwrap();
228
229            let sut = S3ExpressIdentityCache::new(1, time.clone().into_shared(), DEFAULT_BUFFER_TIME);
230
231            let identity_resolver = test_identity_resolver(vec![Ok(identity_expiring_in(1000)), Ok(identity_expiring_in(2000))]);
232
233            let key = sut.key("test-bucket--usw2-az1--x-s3", &Credentials::for_tests_with_session_token());
234
235            // First call to the cache, populating a cache entry.
236            expect_identity(1000, &sut, key.clone(), || {
237                let identity_resolver = identity_resolver.clone();
238                let runtime_components = runtime_components.clone();
239                async move { load(identity_resolver, &runtime_components).await }
240            })
241            .await;
242
243            // Testing for a cache hit by advancing time such that the updated time is before the expiration of the first identity
244            // i.e. 500 < 1000.
245            time.set_time(epoch_secs(500));
246
247            expect_identity(1000, &sut, key.clone(), || async move { panic!("new identity should not be loaded") }).await;
248
249            // Testing for a cache miss by advancing time such that the updated time is now after the expiration of the first identity
250            // and before the expiration of the second identity i.e. 1000 < 1500 && 1500 < 2000.
251            time.set_time(epoch_secs(1500));
252
253            expect_identity(2000, &sut, key, || async move { load(identity_resolver, &runtime_components).await }).await;
254        }
255
256        #[test]
257        fn load_contention() {
258            let rt = tokio::runtime::Builder::new_multi_thread()
259                .enable_time()
260                .worker_threads(16)
261                .build()
262                .unwrap();
263
264            let time = ManualTimeSource::new(epoch_secs(0));
265            let runtime_components = RuntimeComponentsBuilder::for_tests()
266                .with_time_source(Some(time.clone()))
267                .with_sleep_impl(Some(TokioSleep::new()))
268                .build()
269                .unwrap();
270
271            let number_of_buckets = 4;
272            let sut = Arc::new(S3ExpressIdentityCache::new(
273                number_of_buckets,
274                time.clone().into_shared(),
275                DEFAULT_BUFFER_TIME,
276            ));
277
278            // Nested for loops below advance time by 200 in total, and each identity has the expiration
279            // such that no matter what order async tasks are executed, it never expires.
280            let safe_expiration = number_of_buckets as u64 * 50 + DEFAULT_BUFFER_TIME.as_secs() + 1;
281            let identity_resolver = test_identity_resolver(vec![
282                Ok(identity_expiring_in(safe_expiration)),
283                Ok(identity_expiring_in(safe_expiration)),
284                Ok(identity_expiring_in(safe_expiration)),
285                Ok(identity_expiring_in(safe_expiration)),
286            ]);
287
288            let mut tasks = Vec::new();
289            for i in 0..number_of_buckets {
290                let key = sut.key(&format!("test-bucket-{i}-usw2-az1--x-s3"), &Credentials::for_tests_with_session_token());
291                for _ in 0..50 {
292                    let sut = sut.clone();
293                    let key = key.clone();
294                    let identity_resolver = identity_resolver.clone();
295                    let time = time.clone();
296                    let runtime_components = runtime_components.clone();
297                    tasks.push(rt.spawn(async move {
298                        let now = time.advance(Duration::from_secs(1));
299                        let identity: Identity = sut
300                            .get_or_load(key, || async move { load(identity_resolver, &runtime_components).await })
301                            .await
302                            .unwrap();
303
304                        assert!(identity.expiration().unwrap() >= now, "{:?} >= {:?}", identity.expiration(), now);
305                    }));
306                }
307            }
308            let tasks = tasks.into_iter().collect::<FuturesUnordered<_>>();
309            for task in tasks {
310                rt.block_on(task).unwrap();
311            }
312        }
313
314        #[tokio::test]
315        async fn identity_fetch_triggered_by_lru_eviction() {
316            let time = ManualTimeSource::new(UNIX_EPOCH);
317            let runtime_components = RuntimeComponentsBuilder::for_tests()
318                .with_time_source(Some(time.clone()))
319                .with_sleep_impl(Some(TokioSleep::new()))
320                .build()
321                .unwrap();
322
323            // Create a cache of size 2.
324            let sut = S3ExpressIdentityCache::new(2, time.into_shared(), DEFAULT_BUFFER_TIME);
325
326            let identity_resolver = test_identity_resolver(vec![
327                Ok(identity_expiring_in(1000)),
328                Ok(identity_expiring_in(2000)),
329                Ok(identity_expiring_in(3000)),
330                Ok(identity_expiring_in(4000)),
331            ]);
332
333            let [key1, key2, key3] =
334                [1, 2, 3].map(|i| sut.key(&format!("test-bucket-{i}--usw2-az1--x-s3"), &Credentials::for_tests_with_session_token()));
335
336            // This should pupulate a cache entry for `key1`.
337            expect_identity(1000, &sut, key1.clone(), || {
338                let identity_resolver = identity_resolver.clone();
339                let runtime_components = runtime_components.clone();
340                async move { load(identity_resolver, &runtime_components).await }
341            })
342            .await;
343            // This immediate next call for `key1` should be a cache hit.
344            expect_identity(1000, &sut, key1.clone(), || async move { panic!("new identity should not be loaded") }).await;
345
346            // This should pupulate a cache entry for `key2`.
347            expect_identity(2000, &sut, key2, || {
348                let identity_resolver = identity_resolver.clone();
349                let runtime_components = runtime_components.clone();
350                async move { load(identity_resolver, &runtime_components).await }
351            })
352            .await;
353
354            // This should pupulate a cache entry for `key3`, but evicting a cache entry for `key1` because the cache is full.
355            expect_identity(3000, &sut, key3.clone(), || {
356                let identity_resolver = identity_resolver.clone();
357                let runtime_components = runtime_components.clone();
358                async move { load(identity_resolver, &runtime_components).await }
359            })
360            .await;
361
362            // Attempt to get an identity for `key1` should end up fetching a new one since its cache entry has been evicted.
363            // This fetch should now evict a cache entry for `key2`.
364            expect_identity(4000, &sut, key1, || async move { load(identity_resolver, &runtime_components).await }).await;
365
366            // A cache entry for `key3` should still exist in the cache.
367            expect_identity(3000, &sut, key3, || async move { panic!("new identity should not be loaded") }).await;
368        }
369    }
370}
371/// Supporting code for S3 Express identity provider
372pub(crate) mod identity_provider {
373    use std::time::{Duration, SystemTime};
374
375    use crate::s3_express::identity_cache::S3ExpressIdentityCache;
376    use crate::types::SessionCredentials;
377    use aws_credential_types::provider::error::CredentialsError;
378    use aws_credential_types::Credentials;
379    use aws_smithy_async::time::{SharedTimeSource, TimeSource};
380    use aws_smithy_runtime_api::box_error::BoxError;
381    use aws_smithy_runtime_api::client::endpoint::EndpointResolverParams;
382    use aws_smithy_runtime_api::client::identity::{Identity, IdentityCacheLocation, IdentityFuture, ResolveCachedIdentity, ResolveIdentity};
383    use aws_smithy_runtime_api::client::interceptors::SharedInterceptor;
384    use aws_smithy_runtime_api::client::runtime_components::{GetIdentityResolver, RuntimeComponents};
385    use aws_smithy_runtime_api::shared::IntoShared;
386    use aws_smithy_types::config_bag::ConfigBag;
387
388    use super::identity_cache::{DEFAULT_BUFFER_TIME, DEFAULT_MAX_CACHE_CAPACITY};
389
390    #[derive(Debug)]
391    pub(crate) struct DefaultS3ExpressIdentityProvider {
392        behavior_version: crate::config::BehaviorVersion,
393        cache: S3ExpressIdentityCache,
394    }
395
396    impl TryFrom<SessionCredentials> for Credentials {
397        type Error = BoxError;
398
399        fn try_from(session_creds: SessionCredentials) -> Result<Self, Self::Error> {
400            Ok(Credentials::new(
401                session_creds.access_key_id,
402                session_creds.secret_access_key,
403                Some(session_creds.session_token),
404                Some(
405                    SystemTime::try_from(session_creds.expiration)
406                        .map_err(|_| CredentialsError::unhandled("credential expiration time cannot be represented by a SystemTime"))?,
407                ),
408                "s3express",
409            ))
410        }
411    }
412
413    impl DefaultS3ExpressIdentityProvider {
414        pub(crate) fn builder() -> Builder {
415            Builder::default()
416        }
417
418        async fn identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> Result<Identity, BoxError> {
419            let bucket_name = self.bucket_name(config_bag)?;
420
421            let sigv4_identity_resolver = runtime_components
422                .identity_resolver(aws_runtime::auth::sigv4::SCHEME_ID)
423                .ok_or("identity resolver for sigv4 should be set for S3")?;
424            let aws_identity = runtime_components
425                .identity_cache()
426                .resolve_cached_identity(sigv4_identity_resolver, runtime_components, config_bag)
427                .await?;
428
429            let credentials = aws_identity
430                .data::<Credentials>()
431                .ok_or("wrong identity type for SigV4. Expected AWS credentials but got `{identity:?}")?;
432
433            let key = self.cache.key(bucket_name, credentials);
434            self.cache
435                .get_or_load(key, || async move {
436                    let creds = self.express_session_credentials(bucket_name, runtime_components, config_bag).await?;
437                    let data = Credentials::try_from(creds)?;
438                    Ok((Identity::new(data.clone(), data.expiry()), data.expiry().unwrap()))
439                })
440                .await
441        }
442
443        fn bucket_name<'a>(&'a self, config_bag: &'a ConfigBag) -> Result<&'a str, BoxError> {
444            let params = config_bag.load::<EndpointResolverParams>().expect("endpoint resolver params must be set");
445            let params = params
446                .get::<crate::config::endpoint::Params>()
447                .expect("`Params` should be wrapped in `EndpointResolverParams`");
448            params.bucket().ok_or("A bucket was not set in endpoint params".into())
449        }
450
451        async fn express_session_credentials<'a>(
452            &'a self,
453            bucket_name: &'a str,
454            runtime_components: &'a RuntimeComponents,
455            config_bag: &'a ConfigBag,
456        ) -> Result<SessionCredentials, BoxError> {
457            let mut config_builder = crate::config::Builder::from_config_bag(config_bag).behavior_version(self.behavior_version);
458
459            // inherits all runtime components from a current S3 operation but clears out
460            // out interceptors configured for that operation
461            let mut rc_builder = runtime_components.to_builder();
462            rc_builder.set_interceptors(std::iter::empty::<SharedInterceptor>());
463            config_builder.runtime_components = rc_builder;
464
465            let client = crate::Client::from_conf(config_builder.build());
466            let response = client.create_session().bucket(bucket_name).send().await?;
467
468            response.credentials.ok_or("no session credentials in response".into())
469        }
470    }
471
472    #[derive(Default)]
473    pub(crate) struct Builder {
474        behavior_version: Option<crate::config::BehaviorVersion>,
475        time_source: Option<SharedTimeSource>,
476        buffer_time: Option<Duration>,
477    }
478
479    impl Builder {
480        pub(crate) fn behavior_version(mut self, behavior_version: crate::config::BehaviorVersion) -> Self {
481            self.set_behavior_version(Some(behavior_version));
482            self
483        }
484        pub(crate) fn set_behavior_version(&mut self, behavior_version: Option<crate::config::BehaviorVersion>) -> &mut Self {
485            self.behavior_version = behavior_version;
486            self
487        }
488        pub(crate) fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
489            self.set_time_source(time_source.into_shared());
490            self
491        }
492        pub(crate) fn set_time_source(&mut self, time_source: SharedTimeSource) -> &mut Self {
493            self.time_source = Some(time_source.into_shared());
494            self
495        }
496        #[allow(dead_code)]
497        pub(crate) fn buffer_time(mut self, buffer_time: Duration) -> Self {
498            self.set_buffer_time(Some(buffer_time));
499            self
500        }
501        #[allow(dead_code)]
502        pub(crate) fn set_buffer_time(&mut self, buffer_time: Option<Duration>) -> &mut Self {
503            self.buffer_time = buffer_time;
504            self
505        }
506        pub(crate) fn build(self) -> DefaultS3ExpressIdentityProvider {
507            DefaultS3ExpressIdentityProvider {
508                behavior_version: self.behavior_version.expect("required field `behavior_version` should be set"),
509                cache: S3ExpressIdentityCache::new(
510                    DEFAULT_MAX_CACHE_CAPACITY,
511                    self.time_source.unwrap_or_default(),
512                    self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME),
513                ),
514            }
515        }
516    }
517
518    impl ResolveIdentity for DefaultS3ExpressIdentityProvider {
519        fn resolve_identity<'a>(&'a self, runtime_components: &'a RuntimeComponents, config_bag: &'a ConfigBag) -> IdentityFuture<'a> {
520            IdentityFuture::new(async move { self.identity(runtime_components, config_bag).await })
521        }
522
523        fn cache_location(&self) -> IdentityCacheLocation {
524            IdentityCacheLocation::IdentityResolver
525        }
526    }
527}
528
529/// Supporting code for S3 Express runtime plugin
530pub(crate) mod runtime_plugin {
531    use aws_runtime::auth::SigV4SessionTokenNameOverride;
532    use aws_sigv4::http_request::{SignatureLocation, SigningSettings};
533    use aws_smithy_runtime_api::{box_error::BoxError, client::runtime_plugin::RuntimePlugin};
534    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
535    use aws_types::os_shim_internal::Env;
536
537    mod env {
538        pub(super) const S3_DISABLE_EXPRESS_SESSION_AUTH: &str = "AWS_S3_DISABLE_EXPRESS_SESSION_AUTH";
539    }
540
541    #[derive(Debug)]
542    pub(crate) struct S3ExpressRuntimePlugin {
543        config: FrozenLayer,
544    }
545
546    impl S3ExpressRuntimePlugin {
547        pub(crate) fn new(disable_s3_express_session_token: Option<crate::config::DisableS3ExpressSessionAuth>) -> Self {
548            Self::new_with(disable_s3_express_session_token, Env::real())
549        }
550
551        fn new_with(disable_s3_express_session_token: Option<crate::config::DisableS3ExpressSessionAuth>, env: Env) -> Self {
552            let mut layer = Layer::new("S3ExpressRuntimePlugin");
553            if disable_s3_express_session_token.is_none() {
554                match env.get(env::S3_DISABLE_EXPRESS_SESSION_AUTH) {
555                    Ok(value) if value.eq_ignore_ascii_case("true") || value.eq_ignore_ascii_case("false") => {
556                        let value = value.to_lowercase().parse::<bool>().expect("just checked to be a bool-valued string");
557                        layer.store_or_unset(Some(crate::config::DisableS3ExpressSessionAuth(value)));
558                    }
559                    Ok(value) => {
560                        tracing::warn!(
561                            "environment variable `{}` ignored since it only accepts either `true` or `false` (case-insensitive), but got `{}`.",
562                            env::S3_DISABLE_EXPRESS_SESSION_AUTH,
563                            value
564                        )
565                    }
566                    _ => {
567                        // TODO(aws-sdk-rust#1073): Transfer a value of
568                        //  `s3_disable_express_session_auth` from a profile file to `layer`
569                    }
570                }
571            }
572
573            let session_token_name_override = SigV4SessionTokenNameOverride::new(|settings: &SigningSettings, cfg: &ConfigBag| {
574                // Not configured for S3 express, use the original session token name override
575                if !crate::s3_express::utils::for_s3_express(cfg) {
576                    return Ok(settings.session_token_name_override);
577                }
578
579                let session_token_name_override = Some(match settings.signature_location {
580                    SignatureLocation::Headers => "x-amz-s3session-token",
581                    SignatureLocation::QueryParams => "X-Amz-S3session-Token",
582                    _ => {
583                        return Err(BoxError::from(
584                            "`SignatureLocation` adds a new variant, which needs to be handled in a separate match arm",
585                        ))
586                    }
587                });
588                Ok(session_token_name_override)
589            });
590            layer.store_or_unset(Some(session_token_name_override));
591
592            Self { config: layer.freeze() }
593        }
594    }
595
596    impl RuntimePlugin for S3ExpressRuntimePlugin {
597        fn config(&self) -> Option<FrozenLayer> {
598            Some(self.config.clone())
599        }
600    }
601
602    #[cfg(test)]
603    mod tests {
604        use super::*;
605
606        #[test]
607        fn disable_option_set_from_service_client_should_take_the_highest_precedence() {
608            // Disable option is set from service client.
609            let disable_s3_express_session_token = crate::config::DisableS3ExpressSessionAuth(true);
610
611            // An environment variable says the session auth is _not_ disabled, but it will be
612            // overruled by what is in `layer`.
613            let sut = S3ExpressRuntimePlugin::new_with(
614                Some(disable_s3_express_session_token),
615                Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "false")]),
616            );
617
618            // While this runtime plugin does not contain the config value, `ServiceRuntimePlugin`
619            // will eventually provide it when a config bag is fully set up in the orchestrator.
620            assert!(sut
621                .config()
622                .is_some_and(|cfg| cfg.load::<crate::config::DisableS3ExpressSessionAuth>().is_none()));
623        }
624
625        #[test]
626        fn disable_option_set_from_env_should_take_the_second_highest_precedence() {
627            // An environment variable says session auth is disabled
628            let sut = S3ExpressRuntimePlugin::new_with(None, Env::from_slice(&[(super::env::S3_DISABLE_EXPRESS_SESSION_AUTH, "true")]));
629
630            let cfg = sut.config().unwrap();
631            assert!(cfg.load::<crate::config::DisableS3ExpressSessionAuth>().unwrap().0);
632        }
633
634        #[should_panic]
635        #[test]
636        fn disable_option_set_from_profile_file_should_take_the_lowest_precedence() {
637            // TODO(aws-sdk-rust#1073): Implement a test that mimics only setting
638            //  `s3_disable_express_session_auth` in a profile file
639            todo!()
640        }
641
642        #[test]
643        fn disable_option_should_be_unspecified_if_unset() {
644            // An environment variable says session auth is disabled
645            let sut = S3ExpressRuntimePlugin::new_with(None, Env::from_slice(&[]));
646
647            let cfg = sut.config().unwrap();
648            assert!(cfg.load::<crate::config::DisableS3ExpressSessionAuth>().is_none());
649        }
650    }
651}
652
653pub(crate) mod checksum {
654    use crate::http_request_checksum::DefaultRequestChecksumOverride;
655    use aws_smithy_checksums::ChecksumAlgorithm;
656    use aws_smithy_types::config_bag::ConfigBag;
657
658    pub(crate) fn provide_default_checksum_algorithm() -> crate::http_request_checksum::DefaultRequestChecksumOverride {
659        fn _provide_default_checksum_algorithm(original_checksum: Option<ChecksumAlgorithm>, cfg: &ConfigBag) -> Option<ChecksumAlgorithm> {
660            // S3 does not have the `ChecksumAlgorithm::Md5`, therefore customers cannot set it
661            // from outside.
662            if original_checksum != Some(ChecksumAlgorithm::Md5) {
663                return original_checksum;
664            }
665
666            if crate::s3_express::utils::for_s3_express(cfg) {
667                // S3 Express requires setting the default checksum algorithm to CRC-32
668                Some(ChecksumAlgorithm::Crc32)
669            } else {
670                original_checksum
671            }
672        }
673        DefaultRequestChecksumOverride::new(_provide_default_checksum_algorithm)
674    }
675}
676
677pub(crate) mod utils {
678    use aws_smithy_types::{config_bag::ConfigBag, Document};
679
680    pub(crate) fn for_s3_express(cfg: &ConfigBag) -> bool {
681        // logic borrowed from aws_smithy_runtime::client::orchestrator::auth::extract_endpoint_auth_scheme_config
682        let endpoint = cfg
683            .load::<crate::config::endpoint::Endpoint>()
684            .expect("endpoint added to config bag by endpoint orchestrator");
685
686        let auth_schemes = match endpoint.properties().get("authSchemes") {
687            Some(Document::Array(schemes)) => schemes,
688            _ => return false,
689        };
690        auth_schemes.iter().any(|doc| {
691            let config_scheme_id = doc.as_object().and_then(|object| object.get("name")).and_then(Document::as_string);
692            config_scheme_id == Some(crate::s3_express::auth::SCHEME_ID.as_str())
693        })
694    }
695}