aws_smithy_runtime/client/
orchestrator.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use self::auth::orchestrate_auth;
7use crate::client::interceptors::Interceptors;
8use crate::client::orchestrator::http::{log_response_body, read_body};
9use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
10use crate::client::{
11    http::body::minimum_throughput::MaybeUploadThroughputCheckFuture,
12    orchestrator::endpoints::orchestrate_endpoint,
13};
14use aws_smithy_async::rt::sleep::AsyncSleep;
15use aws_smithy_runtime_api::box_error::BoxError;
16use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnector, HttpConnectorSettings};
17use aws_smithy_runtime_api::client::interceptors::context::{
18    Error, Input, InterceptorContext, Output, RewindResult,
19};
20use aws_smithy_runtime_api::client::orchestrator::{
21    HttpResponse, LoadedRequestBody, OrchestratorError,
22};
23use aws_smithy_runtime_api::client::result::SdkError;
24use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
25use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
26use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
27use aws_smithy_runtime_api::client::ser_de::{
28    DeserializeResponse, SerializeRequest, SharedRequestSerializer, SharedResponseDeserializer,
29};
30use aws_smithy_types::body::SdkBody;
31use aws_smithy_types::byte_stream::ByteStream;
32use aws_smithy_types::config_bag::ConfigBag;
33use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig};
34use std::mem;
35use tracing::{debug, debug_span, instrument, trace, Instrument};
36
37mod auth;
38
39/// Defines types that implement a trait for endpoint resolution
40pub mod endpoints;
41
42/// Defines types that work with HTTP types
43mod http;
44
45/// Utility for making one-off unmodeled requests with the orchestrator.
46pub mod operation;
47
48macro_rules! halt {
49    ([$ctx:ident] => $err:expr) => {{
50        debug!("encountered orchestrator error; halting");
51        $ctx.fail($err.into());
52        return;
53    }};
54}
55
56macro_rules! halt_on_err {
57    ([$ctx:ident] => $expr:expr) => {
58        match $expr {
59            Ok(ok) => ok,
60            Err(err) => halt!([$ctx] => err),
61        }
62    };
63}
64
65macro_rules! continue_on_err {
66    ([$ctx:ident] => $expr:expr) => {
67        if let Err(err) = $expr {
68            debug!(err = ?err, "encountered orchestrator error; continuing");
69            $ctx.fail(err.into());
70        }
71    };
72}
73
74macro_rules! run_interceptors {
75    (continue_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
76        $(run_interceptors!(continue_on_err: $interceptor($ctx, $rc, $cfg));)+
77    };
78    (continue_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
79        continue_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
80    };
81    (halt_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
82        $(run_interceptors!(halt_on_err: $interceptor($ctx, $rc, $cfg));)+
83    };
84    (halt_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
85        halt_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
86    };
87    (__private $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
88        Interceptors::new($rc.interceptors()).$interceptor($ctx, $rc, $cfg)
89    };
90}
91
92/// Orchestrates the execution of a request and handling of a response.
93///
94/// The given `runtime_plugins` will be used to generate a `ConfigBag` for this request,
95/// and then the given `input` will be serialized and transmitted. When a response is
96/// received, it will be deserialized and returned.
97///
98/// This orchestration handles retries, endpoint resolution, identity resolution, and signing.
99/// Each of these are configurable via the config and runtime components given by the runtime
100/// plugins.
101pub async fn invoke(
102    service_name: &str,
103    operation_name: &str,
104    input: Input,
105    runtime_plugins: &RuntimePlugins,
106) -> Result<Output, SdkError<Error, HttpResponse>> {
107    invoke_with_stop_point(
108        service_name,
109        operation_name,
110        input,
111        runtime_plugins,
112        StopPoint::None,
113    )
114    .await?
115    .finalize()
116}
117
118/// Allows for returning early at different points during orchestration.
119#[non_exhaustive]
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum StopPoint {
122    /// Don't stop orchestration early
123    None,
124
125    /// Stop the orchestrator before transmitting the request
126    BeforeTransmit,
127}
128
129/// Same as [`invoke`], but allows for returning early at different points during orchestration.
130///
131/// Orchestration will cease at the point specified by `stop_point`. This is useful for orchestrations
132/// that don't need to actually transmit requests, such as for generating presigned requests.
133///
134/// See the docs on [`invoke`] for more details.
135pub async fn invoke_with_stop_point(
136    service_name: &str,
137    operation_name: &str,
138    input: Input,
139    runtime_plugins: &RuntimePlugins,
140    stop_point: StopPoint,
141) -> Result<InterceptorContext, SdkError<Error, HttpResponse>> {
142    async move {
143        let mut cfg = ConfigBag::base();
144        let cfg = &mut cfg;
145
146        let mut ctx = InterceptorContext::new(input);
147
148        let runtime_components = apply_configuration(&mut ctx, cfg, runtime_plugins)
149            .map_err(SdkError::construction_failure)?;
150        trace!(runtime_components = ?runtime_components);
151
152        let operation_timeout_config =
153            MaybeTimeoutConfig::new(&runtime_components, cfg, TimeoutKind::Operation);
154        trace!(operation_timeout_config = ?operation_timeout_config);
155        async {
156            // If running the pre-execution interceptors failed, then we skip running the op and run the
157            // final interceptors instead.
158            if !ctx.is_failed() {
159                try_op(&mut ctx, cfg, &runtime_components, stop_point).await;
160            }
161            finally_op(&mut ctx, cfg, &runtime_components).await;
162            if ctx.is_failed() {
163                Err(ctx.finalize().expect_err("it is failed"))
164            } else {
165                Ok(ctx)
166            }
167        }
168        .maybe_timeout(operation_timeout_config)
169        .await
170    }
171    // Include a random, internal-only, seven-digit ID for the operation invocation so that it can be correlated in the logs.
172    .instrument(debug_span!("invoke", service = %service_name, operation = %operation_name, sdk_invocation_id = fastrand::u32(1_000_000..10_000_000)))
173    .await
174}
175
176/// Apply configuration is responsible for apply runtime plugins to the config bag, as well as running
177/// `read_before_execution` interceptors. If a failure occurs due to config construction, `invoke`
178/// will raise it to the user. If an interceptor fails, then `invoke`
179#[instrument(skip_all, level = "debug")]
180fn apply_configuration(
181    ctx: &mut InterceptorContext,
182    cfg: &mut ConfigBag,
183    runtime_plugins: &RuntimePlugins,
184) -> Result<RuntimeComponents, BoxError> {
185    let client_rc_builder = runtime_plugins.apply_client_configuration(cfg)?;
186    continue_on_err!([ctx] => Interceptors::new(client_rc_builder.interceptors()).read_before_execution(false, ctx, cfg));
187
188    let operation_rc_builder = runtime_plugins.apply_operation_configuration(cfg)?;
189    continue_on_err!([ctx] => Interceptors::new(operation_rc_builder.interceptors()).read_before_execution(true, ctx, cfg));
190
191    // The order below is important. Client interceptors must run before operation interceptors.
192    let components = RuntimeComponents::builder("merged orchestrator components")
193        .merge_from(&client_rc_builder)
194        .merge_from(&operation_rc_builder)
195        .build()?;
196
197    // In an ideal world, we'd simply update `cfg.load` to behave this way. Unfortunately, we can't
198    // do that without a breaking change. By overwriting the value in the config bag with a merged
199    // version, we can achieve a very similar behavior. `MergeTimeoutConfig`
200    let resolved_timeout_config = cfg.load::<MergeTimeoutConfig>();
201    debug!(
202        "timeout settings for this operation: {:?}",
203        resolved_timeout_config
204    );
205    cfg.interceptor_state().store_put(resolved_timeout_config);
206
207    components.validate_final_config(cfg)?;
208    Ok(components)
209}
210
211#[instrument(skip_all, level = "debug")]
212async fn try_op(
213    ctx: &mut InterceptorContext,
214    cfg: &mut ConfigBag,
215    runtime_components: &RuntimeComponents,
216    stop_point: StopPoint,
217) {
218    // Before serialization
219    run_interceptors!(halt_on_err: {
220        modify_before_serialization(ctx, runtime_components, cfg);
221        read_before_serialization(ctx, runtime_components, cfg);
222    });
223
224    // Serialization
225    ctx.enter_serialization_phase();
226    {
227        let _span = debug_span!("serialization").entered();
228        let request_serializer = cfg
229            .load::<SharedRequestSerializer>()
230            .expect("request serializer must be in the config bag")
231            .clone();
232        let input = ctx.take_input().expect("input set at this point");
233        let request = halt_on_err!([ctx] => request_serializer.serialize_input(input, cfg).map_err(OrchestratorError::other));
234        ctx.set_request(request);
235    }
236
237    // Load the request body into memory if configured to do so
238    if let Some(&LoadedRequestBody::Requested) = cfg.load::<LoadedRequestBody>() {
239        debug!("loading request body into memory");
240        let mut body = SdkBody::taken();
241        mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
242        let loaded_body = halt_on_err!([ctx] =>
243            ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
244        )
245        .into_bytes();
246        *ctx.request_mut().as_mut().expect("set above").body_mut() =
247            SdkBody::from(loaded_body.clone());
248        cfg.interceptor_state()
249            .store_put(LoadedRequestBody::Loaded(loaded_body));
250    }
251
252    // Before transmit
253    ctx.enter_before_transmit_phase();
254    run_interceptors!(halt_on_err: {
255        read_after_serialization(ctx, runtime_components, cfg);
256        modify_before_retry_loop(ctx, runtime_components, cfg);
257    });
258
259    // If we got a retry strategy from the bag, ask it what to do.
260    // Otherwise, assume we should attempt the initial request.
261    let should_attempt = runtime_components
262        .retry_strategy()
263        .should_attempt_initial_request(runtime_components, cfg);
264    match should_attempt {
265        // Yes, let's make a request
266        Ok(ShouldAttempt::Yes) => debug!("retry strategy has OKed initial request"),
267        // No, this request shouldn't be sent
268        Ok(ShouldAttempt::No) => {
269            let err: BoxError = "the retry strategy indicates that an initial request shouldn't be made, but it didn't specify why".into();
270            halt!([ctx] => OrchestratorError::other(err));
271        }
272        // No, we shouldn't make a request because...
273        Err(err) => halt!([ctx] => OrchestratorError::other(err)),
274        Ok(ShouldAttempt::YesAfterDelay(delay)) => {
275            let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
276                "the retry strategy requested a delay before sending the initial request, but no 'async sleep' implementation was set"
277            )));
278            debug!("retry strategy has OKed initial request after a {delay:?} delay");
279            sleep_impl.sleep(delay).await;
280        }
281    }
282
283    // Save a request checkpoint before we make the request. This will allow us to "rewind"
284    // the request in the case of retry attempts.
285    ctx.save_checkpoint();
286    let mut retry_delay = None;
287    for i in 1u32.. {
288        // Break from the loop if we can't rewind the request's state. This will always succeed the
289        // first time, but will fail on subsequent iterations if the request body wasn't retryable.
290        trace!("checking if context can be rewound for attempt #{i}");
291        if let RewindResult::Impossible = ctx.rewind(cfg) {
292            debug!("request cannot be retried since the request body cannot be cloned");
293            break;
294        }
295        // Track which attempt we're currently on.
296        cfg.interceptor_state()
297            .store_put::<RequestAttempts>(i.into());
298        // Backoff time should not be included in the attempt timeout
299        if let Some((delay, sleep)) = retry_delay.take() {
300            debug!("delaying for {delay:?}");
301            sleep.await;
302        }
303        let attempt_timeout_config =
304            MaybeTimeoutConfig::new(runtime_components, cfg, TimeoutKind::OperationAttempt);
305        trace!(attempt_timeout_config = ?attempt_timeout_config);
306        let maybe_timeout = async {
307            debug!("beginning attempt #{i}");
308            try_attempt(ctx, cfg, runtime_components, stop_point).await;
309            finally_attempt(ctx, cfg, runtime_components).await;
310            Result::<_, SdkError<Error, HttpResponse>>::Ok(())
311        }
312        .maybe_timeout(attempt_timeout_config)
313        .await
314        .map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
315
316        // We continue when encountering a timeout error. The retry classifier will decide what to do with it.
317        continue_on_err!([ctx] => maybe_timeout);
318
319        // If we got a retry strategy from the bag, ask it what to do.
320        // If no strategy was set, we won't retry.
321        let should_attempt = halt_on_err!([ctx] => runtime_components
322            .retry_strategy()
323            .should_attempt_retry(ctx, runtime_components, cfg)
324            .map_err(OrchestratorError::other));
325        match should_attempt {
326            // Yes, let's retry the request
327            ShouldAttempt::Yes => continue,
328            // No, this request shouldn't be retried
329            ShouldAttempt::No => {
330                debug!("a retry is either unnecessary or not possible, exiting attempt loop");
331                break;
332            }
333            ShouldAttempt::YesAfterDelay(delay) => {
334                let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
335                    "the retry strategy requested a delay before sending the retry request, but no 'async sleep' implementation was set"
336                )));
337                retry_delay = Some((delay, sleep_impl.sleep(delay)));
338                continue;
339            }
340        }
341    }
342}
343
344#[instrument(skip_all, level = "debug")]
345async fn try_attempt(
346    ctx: &mut InterceptorContext,
347    cfg: &mut ConfigBag,
348    runtime_components: &RuntimeComponents,
349    stop_point: StopPoint,
350) {
351    run_interceptors!(halt_on_err: read_before_attempt(ctx, runtime_components, cfg));
352
353    halt_on_err!([ctx] => orchestrate_endpoint(ctx, runtime_components, cfg).await.map_err(OrchestratorError::other));
354
355    run_interceptors!(halt_on_err: {
356        modify_before_signing(ctx, runtime_components, cfg);
357        read_before_signing(ctx, runtime_components, cfg);
358    });
359
360    halt_on_err!([ctx] => orchestrate_auth(ctx, runtime_components, cfg).await.map_err(OrchestratorError::other));
361
362    run_interceptors!(halt_on_err: {
363        read_after_signing(ctx, runtime_components, cfg);
364        modify_before_transmit(ctx, runtime_components, cfg);
365        read_before_transmit(ctx, runtime_components, cfg);
366    });
367
368    // Return early if a stop point is set for before transmit
369    if let StopPoint::BeforeTransmit = stop_point {
370        debug!("ending orchestration early because the stop point is `BeforeTransmit`");
371        return;
372    }
373
374    // The connection consumes the request but we need to keep a copy of it
375    // within the interceptor context, so we clone it here.
376    ctx.enter_transmit_phase();
377    let response = halt_on_err!([ctx] => {
378        let request = ctx.take_request().expect("set during serialization");
379        trace!(request = ?request, "transmitting request");
380        let http_client = halt_on_err!([ctx] => runtime_components.http_client().ok_or_else(||
381            OrchestratorError::other("No HTTP client was available to send this request. \
382                Enable the `default-https-client` crate feature or configure an HTTP client to fix this.")
383        ));
384        let timeout_config = cfg.load::<TimeoutConfig>().expect("timeout config must be set");
385        let settings = {
386            let mut builder = HttpConnectorSettings::builder();
387            builder.set_connect_timeout(timeout_config.connect_timeout());
388            builder.set_read_timeout(timeout_config.read_timeout());
389            builder.build()
390        };
391        let connector = http_client.http_connector(&settings, runtime_components);
392        let response_future = MaybeUploadThroughputCheckFuture::new(
393            cfg,
394            runtime_components,
395            connector.call(request),
396        );
397        response_future.await.map_err(OrchestratorError::connector)
398    });
399    trace!(response = ?response, "received response from service");
400    ctx.set_response(response);
401    ctx.enter_before_deserialization_phase();
402
403    run_interceptors!(halt_on_err: {
404        read_after_transmit(ctx, runtime_components, cfg);
405        modify_before_deserialization(ctx, runtime_components, cfg);
406        read_before_deserialization(ctx, runtime_components, cfg);
407    });
408
409    ctx.enter_deserialization_phase();
410    let output_or_error = async {
411        let response = ctx.response_mut().expect("set during transmit");
412        let response_deserializer = cfg
413            .load::<SharedResponseDeserializer>()
414            .expect("a request deserializer must be in the config bag");
415        let maybe_deserialized = {
416            let _span = debug_span!("deserialize_streaming").entered();
417            response_deserializer.deserialize_streaming(response)
418        };
419        match maybe_deserialized {
420            Some(output_or_error) => output_or_error,
421            None => read_body(response)
422                .instrument(debug_span!("read_body"))
423                .await
424                .map_err(OrchestratorError::response)
425                .and_then(|_| {
426                    let _span = debug_span!("deserialize_nonstreaming").entered();
427                    log_response_body(response, cfg);
428                    response_deserializer.deserialize_nonstreaming(response)
429                }),
430        }
431    }
432    .instrument(debug_span!("deserialization"))
433    .await;
434    trace!(output_or_error = ?output_or_error);
435    ctx.set_output_or_error(output_or_error);
436
437    ctx.enter_after_deserialization_phase();
438    run_interceptors!(halt_on_err: read_after_deserialization(ctx, runtime_components, cfg));
439}
440
441#[instrument(skip_all, level = "debug")]
442async fn finally_attempt(
443    ctx: &mut InterceptorContext,
444    cfg: &mut ConfigBag,
445    runtime_components: &RuntimeComponents,
446) {
447    run_interceptors!(continue_on_err: {
448        modify_before_attempt_completion(ctx, runtime_components, cfg);
449        read_after_attempt(ctx, runtime_components, cfg);
450    });
451}
452
453#[instrument(skip_all, level = "debug")]
454async fn finally_op(
455    ctx: &mut InterceptorContext,
456    cfg: &mut ConfigBag,
457    runtime_components: &RuntimeComponents,
458) {
459    run_interceptors!(continue_on_err: {
460        modify_before_completion(ctx, runtime_components, cfg);
461        read_after_execution(ctx, runtime_components, cfg);
462    });
463}
464
465#[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
466mod tests {
467    use crate::client::auth::no_auth::{NoAuthRuntimePlugin, NO_AUTH_SCHEME_ID};
468    use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
469    use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
470    use crate::client::retries::strategy::NeverRetryStrategy;
471    use crate::client::test_util::{
472        deserializer::CannedResponseDeserializer, serializer::CannedRequestSerializer,
473    };
474    use aws_smithy_http_client::test_util::NeverClient;
475    use aws_smithy_runtime_api::box_error::BoxError;
476    use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
477    use aws_smithy_runtime_api::client::auth::{
478        AuthSchemeOptionResolverParams, SharedAuthSchemeOptionResolver,
479    };
480    use aws_smithy_runtime_api::client::endpoint::{
481        EndpointResolverParams, SharedEndpointResolver,
482    };
483    use aws_smithy_runtime_api::client::http::{
484        http_client_fn, HttpConnector, HttpConnectorFuture,
485    };
486    use aws_smithy_runtime_api::client::interceptors::context::{
487        AfterDeserializationInterceptorContextRef, BeforeDeserializationInterceptorContextMut,
488        BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
489        BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
490        BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
491        FinalizerInterceptorContextRef, Input, Output,
492    };
493    use aws_smithy_runtime_api::client::interceptors::{Intercept, SharedInterceptor};
494    use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError};
495    use aws_smithy_runtime_api::client::retries::SharedRetryStrategy;
496    use aws_smithy_runtime_api::client::runtime_components::{
497        RuntimeComponents, RuntimeComponentsBuilder,
498    };
499    use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
500    use aws_smithy_runtime_api::client::ser_de::{
501        SharedRequestSerializer, SharedResponseDeserializer,
502    };
503    use aws_smithy_runtime_api::shared::IntoShared;
504    use aws_smithy_types::body::SdkBody;
505    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
506    use aws_smithy_types::timeout::TimeoutConfig;
507    use http_02x::{Response, StatusCode};
508    use std::borrow::Cow;
509    use std::sync::atomic::{AtomicBool, Ordering};
510    use std::sync::Arc;
511    use tracing_test::traced_test;
512
513    fn new_request_serializer() -> CannedRequestSerializer {
514        CannedRequestSerializer::success(HttpRequest::empty())
515    }
516
517    fn new_response_deserializer() -> CannedResponseDeserializer {
518        CannedResponseDeserializer::new(
519            Response::builder()
520                .status(StatusCode::OK)
521                .body(SdkBody::empty())
522                .map_err(|err| OrchestratorError::other(Box::new(err)))
523                .map(Output::erase),
524        )
525    }
526
527    #[derive(Debug, Default)]
528    struct OkConnector {}
529
530    impl OkConnector {
531        fn new() -> Self {
532            Self::default()
533        }
534    }
535
536    impl HttpConnector for OkConnector {
537        fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
538            HttpConnectorFuture::ready(Ok(http_02x::Response::builder()
539                .status(200)
540                .body(SdkBody::empty())
541                .expect("OK response is valid")
542                .try_into()
543                .unwrap()))
544        }
545    }
546
547    #[derive(Debug)]
548    struct TestOperationRuntimePlugin {
549        builder: RuntimeComponentsBuilder,
550    }
551
552    impl TestOperationRuntimePlugin {
553        fn new() -> Self {
554            Self {
555                builder: RuntimeComponentsBuilder::for_tests()
556                    .with_retry_strategy(Some(SharedRetryStrategy::new(NeverRetryStrategy::new())))
557                    .with_endpoint_resolver(Some(SharedEndpointResolver::new(
558                        StaticUriEndpointResolver::http_localhost(8080),
559                    )))
560                    .with_http_client(Some(http_client_fn(|_, _| {
561                        OkConnector::new().into_shared()
562                    })))
563                    .with_auth_scheme_option_resolver(Some(SharedAuthSchemeOptionResolver::new(
564                        StaticAuthSchemeOptionResolver::new(vec![NO_AUTH_SCHEME_ID]),
565                    ))),
566            }
567        }
568    }
569
570    impl RuntimePlugin for TestOperationRuntimePlugin {
571        fn config(&self) -> Option<FrozenLayer> {
572            let mut layer = Layer::new("TestOperationRuntimePlugin");
573            layer.store_put(AuthSchemeOptionResolverParams::new("idontcare"));
574            layer.store_put(EndpointResolverParams::new("dontcare"));
575            layer.store_put(SharedRequestSerializer::new(new_request_serializer()));
576            layer.store_put(SharedResponseDeserializer::new(new_response_deserializer()));
577            layer.store_put(TimeoutConfig::builder().build());
578            Some(layer.freeze())
579        }
580
581        fn runtime_components(
582            &self,
583            _: &RuntimeComponentsBuilder,
584        ) -> Cow<'_, RuntimeComponentsBuilder> {
585            Cow::Borrowed(&self.builder)
586        }
587    }
588
589    macro_rules! interceptor_error_handling_test {
590        (read_before_execution, $ctx:ty, $expected:expr,) => {
591            interceptor_error_handling_test!(__private read_before_execution, $ctx, $expected,);
592        };
593        ($interceptor:ident, $ctx:ty, $expected:expr) => {
594            interceptor_error_handling_test!(__private $interceptor, $ctx, $expected, _rc: &RuntimeComponents,);
595        };
596        (__private $interceptor:ident, $ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
597            #[derive(Debug)]
598            struct FailingInterceptorA;
599            impl Intercept for FailingInterceptorA {
600                fn name(&self) -> &'static str { "FailingInterceptorA" }
601
602                fn $interceptor(
603                    &self,
604                    _ctx: $ctx,
605                    $($rc_arg)*
606                    _cfg: &mut ConfigBag,
607                ) -> Result<(), BoxError> {
608                    tracing::debug!("FailingInterceptorA called!");
609                    Err("FailingInterceptorA".into())
610                }
611            }
612
613            #[derive(Debug)]
614            struct FailingInterceptorB;
615            impl Intercept for FailingInterceptorB {
616                fn name(&self) -> &'static str { "FailingInterceptorB" }
617
618                fn $interceptor(
619                    &self,
620                    _ctx: $ctx,
621                    $($rc_arg)*
622                    _cfg: &mut ConfigBag,
623                ) -> Result<(), BoxError> {
624                    tracing::debug!("FailingInterceptorB called!");
625                    Err("FailingInterceptorB".into())
626                }
627            }
628
629            #[derive(Debug)]
630            struct FailingInterceptorC;
631            impl Intercept for FailingInterceptorC {
632                fn name(&self) -> &'static str { "FailingInterceptorC" }
633
634                fn $interceptor(
635                    &self,
636                    _ctx: $ctx,
637                    $($rc_arg)*
638                    _cfg: &mut ConfigBag,
639                ) -> Result<(), BoxError> {
640                    tracing::debug!("FailingInterceptorC called!");
641                    Err("FailingInterceptorC".into())
642                }
643            }
644
645            #[derive(Debug)]
646            struct FailingInterceptorsClientRuntimePlugin(RuntimeComponentsBuilder);
647            impl FailingInterceptorsClientRuntimePlugin {
648                fn new() -> Self {
649                    Self(RuntimeComponentsBuilder::new("test").with_interceptor(SharedInterceptor::new(FailingInterceptorA)))
650                }
651            }
652            impl RuntimePlugin for FailingInterceptorsClientRuntimePlugin {
653                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
654                    Cow::Borrowed(&self.0)
655                }
656            }
657
658            #[derive(Debug)]
659            struct FailingInterceptorsOperationRuntimePlugin(RuntimeComponentsBuilder);
660            impl FailingInterceptorsOperationRuntimePlugin {
661                fn new() -> Self {
662                    Self(
663                        RuntimeComponentsBuilder::new("test")
664                            .with_interceptor(SharedInterceptor::new(FailingInterceptorB))
665                            .with_interceptor(SharedInterceptor::new(FailingInterceptorC))
666                    )
667                }
668            }
669            impl RuntimePlugin for FailingInterceptorsOperationRuntimePlugin {
670                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
671                    Cow::Borrowed(&self.0)
672                }
673            }
674
675            let input = Input::doesnt_matter();
676            let runtime_plugins = RuntimePlugins::new()
677                .with_client_plugin(FailingInterceptorsClientRuntimePlugin::new())
678                .with_operation_plugin(TestOperationRuntimePlugin::new())
679                .with_operation_plugin(NoAuthRuntimePlugin::new())
680                .with_operation_plugin(FailingInterceptorsOperationRuntimePlugin::new());
681            let actual = invoke("test", "test", input, &runtime_plugins)
682                .await
683                .expect_err("should error");
684            let actual = format!("{:?}", actual);
685            assert!(
686                actual.starts_with(&$expected),
687                "\nActual error:      {actual}\nShould start with: {}\n",
688                $expected
689            );
690
691            assert!(logs_contain("FailingInterceptorA called!"));
692            assert!(logs_contain("FailingInterceptorB called!"));
693            assert!(logs_contain("FailingInterceptorC called!"));
694        };
695    }
696
697    #[tokio::test]
698    #[traced_test]
699    async fn test_read_before_execution_error_handling() {
700        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeExecution, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
701        interceptor_error_handling_test!(
702            read_before_execution,
703            &BeforeSerializationInterceptorContextRef<'_>,
704            expected,
705        );
706    }
707
708    #[tokio::test]
709    #[traced_test]
710    async fn test_modify_before_serialization_error_handling() {
711        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
712        interceptor_error_handling_test!(
713            modify_before_serialization,
714            &mut BeforeSerializationInterceptorContextMut<'_>,
715            expected
716        );
717    }
718
719    #[tokio::test]
720    #[traced_test]
721    async fn test_read_before_serialization_error_handling() {
722        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
723        interceptor_error_handling_test!(
724            read_before_serialization,
725            &BeforeSerializationInterceptorContextRef<'_>,
726            expected
727        );
728    }
729
730    #[tokio::test]
731    #[traced_test]
732    async fn test_read_after_serialization_error_handling() {
733        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSerialization, interceptor_name: Some("FailingInterceptorC")"#.to_string();
734        interceptor_error_handling_test!(
735            read_after_serialization,
736            &BeforeTransmitInterceptorContextRef<'_>,
737            expected
738        );
739    }
740
741    #[tokio::test]
742    #[traced_test]
743    async fn test_modify_before_retry_loop_error_handling() {
744        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeRetryLoop, interceptor_name: Some("FailingInterceptorC")"#.to_string();
745        interceptor_error_handling_test!(
746            modify_before_retry_loop,
747            &mut BeforeTransmitInterceptorContextMut<'_>,
748            expected
749        );
750    }
751
752    #[tokio::test]
753    #[traced_test]
754    async fn test_read_before_attempt_error_handling() {
755        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeAttempt, interceptor_name: Some("FailingInterceptorC")"#;
756        interceptor_error_handling_test!(
757            read_before_attempt,
758            &BeforeTransmitInterceptorContextRef<'_>,
759            expected
760        );
761    }
762
763    #[tokio::test]
764    #[traced_test]
765    async fn test_modify_before_signing_error_handling() {
766        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
767        interceptor_error_handling_test!(
768            modify_before_signing,
769            &mut BeforeTransmitInterceptorContextMut<'_>,
770            expected
771        );
772    }
773
774    #[tokio::test]
775    #[traced_test]
776    async fn test_read_before_signing_error_handling() {
777        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
778        interceptor_error_handling_test!(
779            read_before_signing,
780            &BeforeTransmitInterceptorContextRef<'_>,
781            expected
782        );
783    }
784
785    #[tokio::test]
786    #[traced_test]
787    async fn test_read_after_signing_error_handling() {
788        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSigning, interceptor_name: Some("FailingInterceptorC")"#;
789        interceptor_error_handling_test!(
790            read_after_signing,
791            &BeforeTransmitInterceptorContextRef<'_>,
792            expected
793        );
794    }
795
796    #[tokio::test]
797    #[traced_test]
798    async fn test_modify_before_transmit_error_handling() {
799        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
800        interceptor_error_handling_test!(
801            modify_before_transmit,
802            &mut BeforeTransmitInterceptorContextMut<'_>,
803            expected
804        );
805    }
806
807    #[tokio::test]
808    #[traced_test]
809    async fn test_read_before_transmit_error_handling() {
810        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
811        interceptor_error_handling_test!(
812            read_before_transmit,
813            &BeforeTransmitInterceptorContextRef<'_>,
814            expected
815        );
816    }
817
818    #[tokio::test]
819    #[traced_test]
820    async fn test_read_after_transmit_error_handling() {
821        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterTransmit, interceptor_name: Some("FailingInterceptorC")"#;
822        interceptor_error_handling_test!(
823            read_after_transmit,
824            &BeforeDeserializationInterceptorContextRef<'_>,
825            expected
826        );
827    }
828
829    #[tokio::test]
830    #[traced_test]
831    async fn test_modify_before_deserialization_error_handling() {
832        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
833        interceptor_error_handling_test!(
834            modify_before_deserialization,
835            &mut BeforeDeserializationInterceptorContextMut<'_>,
836            expected
837        );
838    }
839
840    #[tokio::test]
841    #[traced_test]
842    async fn test_read_before_deserialization_error_handling() {
843        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
844        interceptor_error_handling_test!(
845            read_before_deserialization,
846            &BeforeDeserializationInterceptorContextRef<'_>,
847            expected
848        );
849    }
850
851    #[tokio::test]
852    #[traced_test]
853    async fn test_read_after_deserialization_error_handling() {
854        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
855        interceptor_error_handling_test!(
856            read_after_deserialization,
857            &AfterDeserializationInterceptorContextRef<'_>,
858            expected
859        );
860    }
861
862    #[tokio::test]
863    #[traced_test]
864    async fn test_modify_before_attempt_completion_error_handling() {
865        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("FailingInterceptorC")"#;
866        interceptor_error_handling_test!(
867            modify_before_attempt_completion,
868            &mut FinalizerInterceptorContextMut<'_>,
869            expected
870        );
871    }
872
873    #[tokio::test]
874    #[traced_test]
875    async fn test_read_after_attempt_error_handling() {
876        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("FailingInterceptorC")"#;
877        interceptor_error_handling_test!(
878            read_after_attempt,
879            &FinalizerInterceptorContextRef<'_>,
880            expected
881        );
882    }
883
884    #[tokio::test]
885    #[traced_test]
886    async fn test_modify_before_completion_error_handling() {
887        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("FailingInterceptorC")"#;
888        interceptor_error_handling_test!(
889            modify_before_completion,
890            &mut FinalizerInterceptorContextMut<'_>,
891            expected
892        );
893    }
894
895    #[tokio::test]
896    #[traced_test]
897    async fn test_read_after_execution_error_handling() {
898        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("FailingInterceptorC")"#;
899        interceptor_error_handling_test!(
900            read_after_execution,
901            &FinalizerInterceptorContextRef<'_>,
902            expected
903        );
904    }
905
906    macro_rules! interceptor_error_redirection_test {
907        (read_before_execution, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
908            interceptor_error_redirection_test!(__private read_before_execution, $origin_ctx, $destination_interceptor, $destination_ctx, $expected,);
909        };
910        ($origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
911            interceptor_error_redirection_test!(__private $origin_interceptor, $origin_ctx, $destination_interceptor, $destination_ctx, $expected, _rc: &RuntimeComponents,);
912        };
913        (__private $origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
914            #[derive(Debug)]
915            struct OriginInterceptor;
916            impl Intercept for OriginInterceptor {
917                fn name(&self) -> &'static str { "OriginInterceptor" }
918
919                fn $origin_interceptor(
920                    &self,
921                    _ctx: $origin_ctx,
922                    $($rc_arg)*
923                    _cfg: &mut ConfigBag,
924                ) -> Result<(), BoxError> {
925                    tracing::debug!("OriginInterceptor called!");
926                    Err("OriginInterceptor".into())
927                }
928            }
929
930            #[derive(Debug)]
931            struct DestinationInterceptor;
932            impl Intercept for DestinationInterceptor {
933                fn name(&self) -> &'static str { "DestinationInterceptor" }
934
935                fn $destination_interceptor(
936                    &self,
937                    _ctx: $destination_ctx,
938                    _runtime_components: &RuntimeComponents,
939                    _cfg: &mut ConfigBag,
940                ) -> Result<(), BoxError> {
941                    tracing::debug!("DestinationInterceptor called!");
942                    Err("DestinationInterceptor".into())
943                }
944            }
945
946            #[derive(Debug)]
947            struct InterceptorsTestOperationRuntimePlugin(RuntimeComponentsBuilder);
948            impl InterceptorsTestOperationRuntimePlugin {
949                fn new() -> Self {
950                    Self(
951                        RuntimeComponentsBuilder::new("test")
952                            .with_interceptor(SharedInterceptor::new(OriginInterceptor))
953                            .with_interceptor(SharedInterceptor::new(DestinationInterceptor))
954                    )
955                }
956            }
957            impl RuntimePlugin for InterceptorsTestOperationRuntimePlugin {
958                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
959                    Cow::Borrowed(&self.0)
960                }
961            }
962
963            let input = Input::doesnt_matter();
964            let runtime_plugins = RuntimePlugins::new()
965                .with_operation_plugin(TestOperationRuntimePlugin::new())
966                .with_operation_plugin(NoAuthRuntimePlugin::new())
967                .with_operation_plugin(InterceptorsTestOperationRuntimePlugin::new());
968            let actual = invoke("test", "test", input, &runtime_plugins)
969                .await
970                .expect_err("should error");
971            let actual = format!("{:?}", actual);
972            assert!(
973                actual.starts_with(&$expected),
974                "\nActual error:      {actual}\nShould start with: {}\n",
975                $expected
976            );
977
978            assert!(logs_contain("OriginInterceptor called!"));
979            assert!(logs_contain("DestinationInterceptor called!"));
980        };
981    }
982
983    #[tokio::test]
984    #[traced_test]
985    async fn test_read_before_execution_error_causes_jump_to_modify_before_completion() {
986        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
987        interceptor_error_redirection_test!(
988            read_before_execution,
989            &BeforeSerializationInterceptorContextRef<'_>,
990            modify_before_completion,
991            &mut FinalizerInterceptorContextMut<'_>,
992            expected
993        );
994    }
995
996    #[tokio::test]
997    #[traced_test]
998    async fn test_modify_before_serialization_error_causes_jump_to_modify_before_completion() {
999        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1000        interceptor_error_redirection_test!(
1001            modify_before_serialization,
1002            &mut BeforeSerializationInterceptorContextMut<'_>,
1003            modify_before_completion,
1004            &mut FinalizerInterceptorContextMut<'_>,
1005            expected
1006        );
1007    }
1008
1009    #[tokio::test]
1010    #[traced_test]
1011    async fn test_read_before_serialization_error_causes_jump_to_modify_before_completion() {
1012        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1013        interceptor_error_redirection_test!(
1014            read_before_serialization,
1015            &BeforeSerializationInterceptorContextRef<'_>,
1016            modify_before_completion,
1017            &mut FinalizerInterceptorContextMut<'_>,
1018            expected
1019        );
1020    }
1021
1022    #[tokio::test]
1023    #[traced_test]
1024    async fn test_read_after_serialization_error_causes_jump_to_modify_before_completion() {
1025        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1026        interceptor_error_redirection_test!(
1027            read_after_serialization,
1028            &BeforeTransmitInterceptorContextRef<'_>,
1029            modify_before_completion,
1030            &mut FinalizerInterceptorContextMut<'_>,
1031            expected
1032        );
1033    }
1034
1035    #[tokio::test]
1036    #[traced_test]
1037    async fn test_modify_before_retry_loop_error_causes_jump_to_modify_before_completion() {
1038        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1039        interceptor_error_redirection_test!(
1040            modify_before_retry_loop,
1041            &mut BeforeTransmitInterceptorContextMut<'_>,
1042            modify_before_completion,
1043            &mut FinalizerInterceptorContextMut<'_>,
1044            expected
1045        );
1046    }
1047
1048    #[tokio::test]
1049    #[traced_test]
1050    async fn test_read_before_attempt_error_causes_jump_to_modify_before_attempt_completion() {
1051        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1052        interceptor_error_redirection_test!(
1053            read_before_attempt,
1054            &BeforeTransmitInterceptorContextRef<'_>,
1055            modify_before_attempt_completion,
1056            &mut FinalizerInterceptorContextMut<'_>,
1057            expected
1058        );
1059    }
1060
1061    #[tokio::test]
1062    #[traced_test]
1063    async fn test_modify_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1064        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1065        interceptor_error_redirection_test!(
1066            modify_before_signing,
1067            &mut BeforeTransmitInterceptorContextMut<'_>,
1068            modify_before_attempt_completion,
1069            &mut FinalizerInterceptorContextMut<'_>,
1070            expected
1071        );
1072    }
1073
1074    #[tokio::test]
1075    #[traced_test]
1076    async fn test_read_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1077        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1078        interceptor_error_redirection_test!(
1079            read_before_signing,
1080            &BeforeTransmitInterceptorContextRef<'_>,
1081            modify_before_attempt_completion,
1082            &mut FinalizerInterceptorContextMut<'_>,
1083            expected
1084        );
1085    }
1086
1087    #[tokio::test]
1088    #[traced_test]
1089    async fn test_read_after_signing_error_causes_jump_to_modify_before_attempt_completion() {
1090        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1091        interceptor_error_redirection_test!(
1092            read_after_signing,
1093            &BeforeTransmitInterceptorContextRef<'_>,
1094            modify_before_attempt_completion,
1095            &mut FinalizerInterceptorContextMut<'_>,
1096            expected
1097        );
1098    }
1099
1100    #[tokio::test]
1101    #[traced_test]
1102    async fn test_modify_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1103        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1104        interceptor_error_redirection_test!(
1105            modify_before_transmit,
1106            &mut BeforeTransmitInterceptorContextMut<'_>,
1107            modify_before_attempt_completion,
1108            &mut FinalizerInterceptorContextMut<'_>,
1109            expected
1110        );
1111    }
1112
1113    #[tokio::test]
1114    #[traced_test]
1115    async fn test_read_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1116        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1117        interceptor_error_redirection_test!(
1118            read_before_transmit,
1119            &BeforeTransmitInterceptorContextRef<'_>,
1120            modify_before_attempt_completion,
1121            &mut FinalizerInterceptorContextMut<'_>,
1122            expected
1123        );
1124    }
1125
1126    #[tokio::test]
1127    #[traced_test]
1128    async fn test_read_after_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1129        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1130        interceptor_error_redirection_test!(
1131            read_after_transmit,
1132            &BeforeDeserializationInterceptorContextRef<'_>,
1133            modify_before_attempt_completion,
1134            &mut FinalizerInterceptorContextMut<'_>,
1135            expected
1136        );
1137    }
1138
1139    #[tokio::test]
1140    #[traced_test]
1141    async fn test_modify_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1142    ) {
1143        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1144        interceptor_error_redirection_test!(
1145            modify_before_deserialization,
1146            &mut BeforeDeserializationInterceptorContextMut<'_>,
1147            modify_before_attempt_completion,
1148            &mut FinalizerInterceptorContextMut<'_>,
1149            expected
1150        );
1151    }
1152
1153    #[tokio::test]
1154    #[traced_test]
1155    async fn test_read_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1156    ) {
1157        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1158        interceptor_error_redirection_test!(
1159            read_before_deserialization,
1160            &BeforeDeserializationInterceptorContextRef<'_>,
1161            modify_before_attempt_completion,
1162            &mut FinalizerInterceptorContextMut<'_>,
1163            expected
1164        );
1165    }
1166
1167    #[tokio::test]
1168    #[traced_test]
1169    async fn test_read_after_deserialization_error_causes_jump_to_modify_before_attempt_completion()
1170    {
1171        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1172        interceptor_error_redirection_test!(
1173            read_after_deserialization,
1174            &AfterDeserializationInterceptorContextRef<'_>,
1175            modify_before_attempt_completion,
1176            &mut FinalizerInterceptorContextMut<'_>,
1177            expected
1178        );
1179    }
1180
1181    #[tokio::test]
1182    #[traced_test]
1183    async fn test_modify_before_attempt_completion_error_causes_jump_to_read_after_attempt() {
1184        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("DestinationInterceptor")"#;
1185        interceptor_error_redirection_test!(
1186            modify_before_attempt_completion,
1187            &mut FinalizerInterceptorContextMut<'_>,
1188            read_after_attempt,
1189            &FinalizerInterceptorContextRef<'_>,
1190            expected
1191        );
1192    }
1193
1194    #[tokio::test]
1195    #[traced_test]
1196    async fn test_modify_before_completion_error_causes_jump_to_read_after_execution() {
1197        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("DestinationInterceptor")"#;
1198        interceptor_error_redirection_test!(
1199            modify_before_completion,
1200            &mut FinalizerInterceptorContextMut<'_>,
1201            read_after_execution,
1202            &FinalizerInterceptorContextRef<'_>,
1203            expected
1204        );
1205    }
1206
1207    #[tokio::test]
1208    async fn test_stop_points() {
1209        let runtime_plugins = || {
1210            RuntimePlugins::new()
1211                .with_operation_plugin(TestOperationRuntimePlugin::new())
1212                .with_operation_plugin(NoAuthRuntimePlugin::new())
1213        };
1214
1215        // StopPoint::None should result in a response getting set since orchestration doesn't stop
1216        let context = invoke_with_stop_point(
1217            "test",
1218            "test",
1219            Input::doesnt_matter(),
1220            &runtime_plugins(),
1221            StopPoint::None,
1222        )
1223        .await
1224        .expect("success");
1225        assert!(context.response().is_some());
1226
1227        // StopPoint::BeforeTransmit will exit right before sending the request, so there should be no response
1228        let context = invoke_with_stop_point(
1229            "test",
1230            "test",
1231            Input::doesnt_matter(),
1232            &runtime_plugins(),
1233            StopPoint::BeforeTransmit,
1234        )
1235        .await
1236        .expect("success");
1237        assert!(context.response().is_none());
1238    }
1239
1240    /// The "finally" interceptors should run upon error when the StopPoint is set to BeforeTransmit
1241    #[tokio::test]
1242    async fn test_stop_points_error_handling() {
1243        #[derive(Debug, Default)]
1244        struct Inner {
1245            modify_before_retry_loop_called: AtomicBool,
1246            modify_before_completion_called: AtomicBool,
1247            read_after_execution_called: AtomicBool,
1248        }
1249        #[derive(Clone, Debug, Default)]
1250        struct TestInterceptor {
1251            inner: Arc<Inner>,
1252        }
1253
1254        impl Intercept for TestInterceptor {
1255            fn name(&self) -> &'static str {
1256                "TestInterceptor"
1257            }
1258
1259            fn modify_before_retry_loop(
1260                &self,
1261                _context: &mut BeforeTransmitInterceptorContextMut<'_>,
1262                _rc: &RuntimeComponents,
1263                _cfg: &mut ConfigBag,
1264            ) -> Result<(), BoxError> {
1265                self.inner
1266                    .modify_before_retry_loop_called
1267                    .store(true, Ordering::Relaxed);
1268                Err("test error".into())
1269            }
1270
1271            fn modify_before_completion(
1272                &self,
1273                _context: &mut FinalizerInterceptorContextMut<'_>,
1274                _rc: &RuntimeComponents,
1275                _cfg: &mut ConfigBag,
1276            ) -> Result<(), BoxError> {
1277                self.inner
1278                    .modify_before_completion_called
1279                    .store(true, Ordering::Relaxed);
1280                Ok(())
1281            }
1282
1283            fn read_after_execution(
1284                &self,
1285                _context: &FinalizerInterceptorContextRef<'_>,
1286                _rc: &RuntimeComponents,
1287                _cfg: &mut ConfigBag,
1288            ) -> Result<(), BoxError> {
1289                self.inner
1290                    .read_after_execution_called
1291                    .store(true, Ordering::Relaxed);
1292                Ok(())
1293            }
1294        }
1295
1296        #[derive(Debug)]
1297        struct TestInterceptorRuntimePlugin {
1298            builder: RuntimeComponentsBuilder,
1299        }
1300
1301        impl RuntimePlugin for TestInterceptorRuntimePlugin {
1302            fn runtime_components(
1303                &self,
1304                _: &RuntimeComponentsBuilder,
1305            ) -> Cow<'_, RuntimeComponentsBuilder> {
1306                Cow::Borrowed(&self.builder)
1307            }
1308        }
1309
1310        let interceptor = TestInterceptor::default();
1311        let client = NeverClient::new();
1312        let runtime_plugins = || {
1313            RuntimePlugins::new()
1314                .with_operation_plugin(TestOperationRuntimePlugin::new())
1315                .with_operation_plugin(NoAuthRuntimePlugin::new())
1316                .with_operation_plugin(TestInterceptorRuntimePlugin {
1317                    builder: RuntimeComponentsBuilder::new("test")
1318                        .with_interceptor(SharedInterceptor::new(interceptor.clone()))
1319                        .with_http_client(Some(client.clone())),
1320                })
1321        };
1322
1323        // StopPoint::BeforeTransmit will exit right before sending the request, so there should be no response
1324        let _err = invoke_with_stop_point(
1325            "test",
1326            "test",
1327            Input::doesnt_matter(),
1328            &runtime_plugins(),
1329            StopPoint::BeforeTransmit,
1330        )
1331        .await
1332        .expect_err("an error was returned");
1333        assert_eq!(client.num_calls(), 0);
1334
1335        assert!(interceptor
1336            .inner
1337            .modify_before_retry_loop_called
1338            .load(Ordering::Relaxed));
1339        assert!(interceptor
1340            .inner
1341            .modify_before_completion_called
1342            .load(Ordering::Relaxed));
1343        assert!(interceptor
1344            .inner
1345            .read_after_execution_called
1346            .load(Ordering::Relaxed));
1347    }
1348}