1use self::auth::orchestrate_auth;
7use crate::client::interceptors::Interceptors;
8use crate::client::orchestrator::http::{log_response_body, read_body};
9use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
10use crate::client::{
11 http::body::minimum_throughput::MaybeUploadThroughputCheckFuture,
12 orchestrator::endpoints::orchestrate_endpoint,
13};
14use aws_smithy_async::rt::sleep::AsyncSleep;
15use aws_smithy_runtime_api::box_error::BoxError;
16use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnector, HttpConnectorSettings};
17use aws_smithy_runtime_api::client::interceptors::context::{
18 Error, Input, InterceptorContext, Output, RewindResult,
19};
20use aws_smithy_runtime_api::client::orchestrator::{
21 HttpResponse, LoadedRequestBody, OrchestratorError,
22};
23use aws_smithy_runtime_api::client::result::SdkError;
24use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
25use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
26use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
27use aws_smithy_runtime_api::client::ser_de::{
28 DeserializeResponse, SerializeRequest, SharedRequestSerializer, SharedResponseDeserializer,
29};
30use aws_smithy_types::body::SdkBody;
31use aws_smithy_types::byte_stream::ByteStream;
32use aws_smithy_types::config_bag::ConfigBag;
33use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig};
34use std::mem;
35use tracing::{debug, debug_span, instrument, trace, Instrument};
36
37mod auth;
38
39pub mod endpoints;
41
42mod http;
44
45pub mod operation;
47
48macro_rules! halt {
49 ([$ctx:ident] => $err:expr) => {{
50 debug!("encountered orchestrator error; halting");
51 $ctx.fail($err.into());
52 return;
53 }};
54}
55
56macro_rules! halt_on_err {
57 ([$ctx:ident] => $expr:expr) => {
58 match $expr {
59 Ok(ok) => ok,
60 Err(err) => halt!([$ctx] => err),
61 }
62 };
63}
64
65macro_rules! continue_on_err {
66 ([$ctx:ident] => $expr:expr) => {
67 if let Err(err) = $expr {
68 debug!(err = ?err, "encountered orchestrator error; continuing");
69 $ctx.fail(err.into());
70 }
71 };
72}
73
74macro_rules! run_interceptors {
75 (continue_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
76 $(run_interceptors!(continue_on_err: $interceptor($ctx, $rc, $cfg));)+
77 };
78 (continue_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
79 continue_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
80 };
81 (halt_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
82 $(run_interceptors!(halt_on_err: $interceptor($ctx, $rc, $cfg));)+
83 };
84 (halt_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
85 halt_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
86 };
87 (__private $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
88 Interceptors::new($rc.interceptors()).$interceptor($ctx, $rc, $cfg)
89 };
90}
91
92pub async fn invoke(
102 service_name: &str,
103 operation_name: &str,
104 input: Input,
105 runtime_plugins: &RuntimePlugins,
106) -> Result<Output, SdkError<Error, HttpResponse>> {
107 invoke_with_stop_point(
108 service_name,
109 operation_name,
110 input,
111 runtime_plugins,
112 StopPoint::None,
113 )
114 .await?
115 .finalize()
116}
117
118#[non_exhaustive]
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum StopPoint {
122 None,
124
125 BeforeTransmit,
127}
128
129pub async fn invoke_with_stop_point(
136 service_name: &str,
137 operation_name: &str,
138 input: Input,
139 runtime_plugins: &RuntimePlugins,
140 stop_point: StopPoint,
141) -> Result<InterceptorContext, SdkError<Error, HttpResponse>> {
142 async move {
143 let mut cfg = ConfigBag::base();
144 let cfg = &mut cfg;
145
146 let mut ctx = InterceptorContext::new(input);
147
148 let runtime_components = apply_configuration(&mut ctx, cfg, runtime_plugins)
149 .map_err(SdkError::construction_failure)?;
150 trace!(runtime_components = ?runtime_components);
151
152 let operation_timeout_config =
153 MaybeTimeoutConfig::new(&runtime_components, cfg, TimeoutKind::Operation);
154 trace!(operation_timeout_config = ?operation_timeout_config);
155 async {
156 if !ctx.is_failed() {
159 try_op(&mut ctx, cfg, &runtime_components, stop_point).await;
160 }
161 finally_op(&mut ctx, cfg, &runtime_components).await;
162 if ctx.is_failed() {
163 Err(ctx.finalize().expect_err("it is failed"))
164 } else {
165 Ok(ctx)
166 }
167 }
168 .maybe_timeout(operation_timeout_config)
169 .await
170 }
171 .instrument(debug_span!("invoke", service = %service_name, operation = %operation_name, sdk_invocation_id = fastrand::u32(1_000_000..10_000_000)))
173 .await
174}
175
176#[instrument(skip_all, level = "debug")]
180fn apply_configuration(
181 ctx: &mut InterceptorContext,
182 cfg: &mut ConfigBag,
183 runtime_plugins: &RuntimePlugins,
184) -> Result<RuntimeComponents, BoxError> {
185 let client_rc_builder = runtime_plugins.apply_client_configuration(cfg)?;
186 continue_on_err!([ctx] => Interceptors::new(client_rc_builder.interceptors()).read_before_execution(false, ctx, cfg));
187
188 let operation_rc_builder = runtime_plugins.apply_operation_configuration(cfg)?;
189 continue_on_err!([ctx] => Interceptors::new(operation_rc_builder.interceptors()).read_before_execution(true, ctx, cfg));
190
191 let components = RuntimeComponents::builder("merged orchestrator components")
193 .merge_from(&client_rc_builder)
194 .merge_from(&operation_rc_builder)
195 .build()?;
196
197 let resolved_timeout_config = cfg.load::<MergeTimeoutConfig>();
201 debug!(
202 "timeout settings for this operation: {:?}",
203 resolved_timeout_config
204 );
205 cfg.interceptor_state().store_put(resolved_timeout_config);
206
207 components.validate_final_config(cfg)?;
208 Ok(components)
209}
210
211#[instrument(skip_all, level = "debug")]
212async fn try_op(
213 ctx: &mut InterceptorContext,
214 cfg: &mut ConfigBag,
215 runtime_components: &RuntimeComponents,
216 stop_point: StopPoint,
217) {
218 run_interceptors!(halt_on_err: {
220 modify_before_serialization(ctx, runtime_components, cfg);
221 read_before_serialization(ctx, runtime_components, cfg);
222 });
223
224 ctx.enter_serialization_phase();
226 {
227 let _span = debug_span!("serialization").entered();
228 let request_serializer = cfg
229 .load::<SharedRequestSerializer>()
230 .expect("request serializer must be in the config bag")
231 .clone();
232 let input = ctx.take_input().expect("input set at this point");
233 let request = halt_on_err!([ctx] => request_serializer.serialize_input(input, cfg).map_err(OrchestratorError::other));
234 ctx.set_request(request);
235 }
236
237 if let Some(&LoadedRequestBody::Requested) = cfg.load::<LoadedRequestBody>() {
239 debug!("loading request body into memory");
240 let mut body = SdkBody::taken();
241 mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
242 let loaded_body = halt_on_err!([ctx] =>
243 ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
244 )
245 .into_bytes();
246 *ctx.request_mut().as_mut().expect("set above").body_mut() =
247 SdkBody::from(loaded_body.clone());
248 cfg.interceptor_state()
249 .store_put(LoadedRequestBody::Loaded(loaded_body));
250 }
251
252 ctx.enter_before_transmit_phase();
254 run_interceptors!(halt_on_err: {
255 read_after_serialization(ctx, runtime_components, cfg);
256 modify_before_retry_loop(ctx, runtime_components, cfg);
257 });
258
259 let should_attempt = runtime_components
262 .retry_strategy()
263 .should_attempt_initial_request(runtime_components, cfg);
264 match should_attempt {
265 Ok(ShouldAttempt::Yes) => debug!("retry strategy has OKed initial request"),
267 Ok(ShouldAttempt::No) => {
269 let err: BoxError = "the retry strategy indicates that an initial request shouldn't be made, but it didn't specify why".into();
270 halt!([ctx] => OrchestratorError::other(err));
271 }
272 Err(err) => halt!([ctx] => OrchestratorError::other(err)),
274 Ok(ShouldAttempt::YesAfterDelay(delay)) => {
275 let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
276 "the retry strategy requested a delay before sending the initial request, but no 'async sleep' implementation was set"
277 )));
278 debug!("retry strategy has OKed initial request after a {delay:?} delay");
279 sleep_impl.sleep(delay).await;
280 }
281 }
282
283 ctx.save_checkpoint();
286 let mut retry_delay = None;
287 for i in 1u32.. {
288 trace!("checking if context can be rewound for attempt #{i}");
291 if let RewindResult::Impossible = ctx.rewind(cfg) {
292 debug!("request cannot be retried since the request body cannot be cloned");
293 break;
294 }
295 cfg.interceptor_state()
297 .store_put::<RequestAttempts>(i.into());
298 if let Some((delay, sleep)) = retry_delay.take() {
300 debug!("delaying for {delay:?}");
301 sleep.await;
302 }
303 let attempt_timeout_config =
304 MaybeTimeoutConfig::new(runtime_components, cfg, TimeoutKind::OperationAttempt);
305 trace!(attempt_timeout_config = ?attempt_timeout_config);
306 let maybe_timeout = async {
307 debug!("beginning attempt #{i}");
308 try_attempt(ctx, cfg, runtime_components, stop_point).await;
309 finally_attempt(ctx, cfg, runtime_components).await;
310 Result::<_, SdkError<Error, HttpResponse>>::Ok(())
311 }
312 .maybe_timeout(attempt_timeout_config)
313 .await
314 .map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
315
316 continue_on_err!([ctx] => maybe_timeout);
318
319 let should_attempt = halt_on_err!([ctx] => runtime_components
322 .retry_strategy()
323 .should_attempt_retry(ctx, runtime_components, cfg)
324 .map_err(OrchestratorError::other));
325 match should_attempt {
326 ShouldAttempt::Yes => continue,
328 ShouldAttempt::No => {
330 debug!("a retry is either unnecessary or not possible, exiting attempt loop");
331 break;
332 }
333 ShouldAttempt::YesAfterDelay(delay) => {
334 let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
335 "the retry strategy requested a delay before sending the retry request, but no 'async sleep' implementation was set"
336 )));
337 retry_delay = Some((delay, sleep_impl.sleep(delay)));
338 continue;
339 }
340 }
341 }
342}
343
344#[instrument(skip_all, level = "debug")]
345async fn try_attempt(
346 ctx: &mut InterceptorContext,
347 cfg: &mut ConfigBag,
348 runtime_components: &RuntimeComponents,
349 stop_point: StopPoint,
350) {
351 run_interceptors!(halt_on_err: read_before_attempt(ctx, runtime_components, cfg));
352
353 halt_on_err!([ctx] => orchestrate_endpoint(ctx, runtime_components, cfg).await.map_err(OrchestratorError::other));
354
355 run_interceptors!(halt_on_err: {
356 modify_before_signing(ctx, runtime_components, cfg);
357 read_before_signing(ctx, runtime_components, cfg);
358 });
359
360 halt_on_err!([ctx] => orchestrate_auth(ctx, runtime_components, cfg).await.map_err(OrchestratorError::other));
361
362 run_interceptors!(halt_on_err: {
363 read_after_signing(ctx, runtime_components, cfg);
364 modify_before_transmit(ctx, runtime_components, cfg);
365 read_before_transmit(ctx, runtime_components, cfg);
366 });
367
368 if let StopPoint::BeforeTransmit = stop_point {
370 debug!("ending orchestration early because the stop point is `BeforeTransmit`");
371 return;
372 }
373
374 ctx.enter_transmit_phase();
377 let response = halt_on_err!([ctx] => {
378 let request = ctx.take_request().expect("set during serialization");
379 trace!(request = ?request, "transmitting request");
380 let http_client = halt_on_err!([ctx] => runtime_components.http_client().ok_or_else(||
381 OrchestratorError::other("No HTTP client was available to send this request. \
382 Enable the `default-https-client` crate feature or configure an HTTP client to fix this.")
383 ));
384 let timeout_config = cfg.load::<TimeoutConfig>().expect("timeout config must be set");
385 let settings = {
386 let mut builder = HttpConnectorSettings::builder();
387 builder.set_connect_timeout(timeout_config.connect_timeout());
388 builder.set_read_timeout(timeout_config.read_timeout());
389 builder.build()
390 };
391 let connector = http_client.http_connector(&settings, runtime_components);
392 let response_future = MaybeUploadThroughputCheckFuture::new(
393 cfg,
394 runtime_components,
395 connector.call(request),
396 );
397 response_future.await.map_err(OrchestratorError::connector)
398 });
399 trace!(response = ?response, "received response from service");
400 ctx.set_response(response);
401 ctx.enter_before_deserialization_phase();
402
403 run_interceptors!(halt_on_err: {
404 read_after_transmit(ctx, runtime_components, cfg);
405 modify_before_deserialization(ctx, runtime_components, cfg);
406 read_before_deserialization(ctx, runtime_components, cfg);
407 });
408
409 ctx.enter_deserialization_phase();
410 let output_or_error = async {
411 let response = ctx.response_mut().expect("set during transmit");
412 let response_deserializer = cfg
413 .load::<SharedResponseDeserializer>()
414 .expect("a request deserializer must be in the config bag");
415 let maybe_deserialized = {
416 let _span = debug_span!("deserialize_streaming").entered();
417 response_deserializer.deserialize_streaming(response)
418 };
419 match maybe_deserialized {
420 Some(output_or_error) => output_or_error,
421 None => read_body(response)
422 .instrument(debug_span!("read_body"))
423 .await
424 .map_err(OrchestratorError::response)
425 .and_then(|_| {
426 let _span = debug_span!("deserialize_nonstreaming").entered();
427 log_response_body(response, cfg);
428 response_deserializer.deserialize_nonstreaming(response)
429 }),
430 }
431 }
432 .instrument(debug_span!("deserialization"))
433 .await;
434 trace!(output_or_error = ?output_or_error);
435 ctx.set_output_or_error(output_or_error);
436
437 ctx.enter_after_deserialization_phase();
438 run_interceptors!(halt_on_err: read_after_deserialization(ctx, runtime_components, cfg));
439}
440
441#[instrument(skip_all, level = "debug")]
442async fn finally_attempt(
443 ctx: &mut InterceptorContext,
444 cfg: &mut ConfigBag,
445 runtime_components: &RuntimeComponents,
446) {
447 run_interceptors!(continue_on_err: {
448 modify_before_attempt_completion(ctx, runtime_components, cfg);
449 read_after_attempt(ctx, runtime_components, cfg);
450 });
451}
452
453#[instrument(skip_all, level = "debug")]
454async fn finally_op(
455 ctx: &mut InterceptorContext,
456 cfg: &mut ConfigBag,
457 runtime_components: &RuntimeComponents,
458) {
459 run_interceptors!(continue_on_err: {
460 modify_before_completion(ctx, runtime_components, cfg);
461 read_after_execution(ctx, runtime_components, cfg);
462 });
463}
464
465#[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
466mod tests {
467 use crate::client::auth::no_auth::{NoAuthRuntimePlugin, NO_AUTH_SCHEME_ID};
468 use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
469 use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
470 use crate::client::retries::strategy::NeverRetryStrategy;
471 use crate::client::test_util::{
472 deserializer::CannedResponseDeserializer, serializer::CannedRequestSerializer,
473 };
474 use aws_smithy_http_client::test_util::NeverClient;
475 use aws_smithy_runtime_api::box_error::BoxError;
476 use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
477 use aws_smithy_runtime_api::client::auth::{
478 AuthSchemeOptionResolverParams, SharedAuthSchemeOptionResolver,
479 };
480 use aws_smithy_runtime_api::client::endpoint::{
481 EndpointResolverParams, SharedEndpointResolver,
482 };
483 use aws_smithy_runtime_api::client::http::{
484 http_client_fn, HttpConnector, HttpConnectorFuture,
485 };
486 use aws_smithy_runtime_api::client::interceptors::context::{
487 AfterDeserializationInterceptorContextRef, BeforeDeserializationInterceptorContextMut,
488 BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
489 BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
490 BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
491 FinalizerInterceptorContextRef, Input, Output,
492 };
493 use aws_smithy_runtime_api::client::interceptors::{Intercept, SharedInterceptor};
494 use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError};
495 use aws_smithy_runtime_api::client::retries::SharedRetryStrategy;
496 use aws_smithy_runtime_api::client::runtime_components::{
497 RuntimeComponents, RuntimeComponentsBuilder,
498 };
499 use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
500 use aws_smithy_runtime_api::client::ser_de::{
501 SharedRequestSerializer, SharedResponseDeserializer,
502 };
503 use aws_smithy_runtime_api::shared::IntoShared;
504 use aws_smithy_types::body::SdkBody;
505 use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
506 use aws_smithy_types::timeout::TimeoutConfig;
507 use http_02x::{Response, StatusCode};
508 use std::borrow::Cow;
509 use std::sync::atomic::{AtomicBool, Ordering};
510 use std::sync::Arc;
511 use tracing_test::traced_test;
512
513 fn new_request_serializer() -> CannedRequestSerializer {
514 CannedRequestSerializer::success(HttpRequest::empty())
515 }
516
517 fn new_response_deserializer() -> CannedResponseDeserializer {
518 CannedResponseDeserializer::new(
519 Response::builder()
520 .status(StatusCode::OK)
521 .body(SdkBody::empty())
522 .map_err(|err| OrchestratorError::other(Box::new(err)))
523 .map(Output::erase),
524 )
525 }
526
527 #[derive(Debug, Default)]
528 struct OkConnector {}
529
530 impl OkConnector {
531 fn new() -> Self {
532 Self::default()
533 }
534 }
535
536 impl HttpConnector for OkConnector {
537 fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
538 HttpConnectorFuture::ready(Ok(http_02x::Response::builder()
539 .status(200)
540 .body(SdkBody::empty())
541 .expect("OK response is valid")
542 .try_into()
543 .unwrap()))
544 }
545 }
546
547 #[derive(Debug)]
548 struct TestOperationRuntimePlugin {
549 builder: RuntimeComponentsBuilder,
550 }
551
552 impl TestOperationRuntimePlugin {
553 fn new() -> Self {
554 Self {
555 builder: RuntimeComponentsBuilder::for_tests()
556 .with_retry_strategy(Some(SharedRetryStrategy::new(NeverRetryStrategy::new())))
557 .with_endpoint_resolver(Some(SharedEndpointResolver::new(
558 StaticUriEndpointResolver::http_localhost(8080),
559 )))
560 .with_http_client(Some(http_client_fn(|_, _| {
561 OkConnector::new().into_shared()
562 })))
563 .with_auth_scheme_option_resolver(Some(SharedAuthSchemeOptionResolver::new(
564 StaticAuthSchemeOptionResolver::new(vec![NO_AUTH_SCHEME_ID]),
565 ))),
566 }
567 }
568 }
569
570 impl RuntimePlugin for TestOperationRuntimePlugin {
571 fn config(&self) -> Option<FrozenLayer> {
572 let mut layer = Layer::new("TestOperationRuntimePlugin");
573 layer.store_put(AuthSchemeOptionResolverParams::new("idontcare"));
574 layer.store_put(EndpointResolverParams::new("dontcare"));
575 layer.store_put(SharedRequestSerializer::new(new_request_serializer()));
576 layer.store_put(SharedResponseDeserializer::new(new_response_deserializer()));
577 layer.store_put(TimeoutConfig::builder().build());
578 Some(layer.freeze())
579 }
580
581 fn runtime_components(
582 &self,
583 _: &RuntimeComponentsBuilder,
584 ) -> Cow<'_, RuntimeComponentsBuilder> {
585 Cow::Borrowed(&self.builder)
586 }
587 }
588
589 macro_rules! interceptor_error_handling_test {
590 (read_before_execution, $ctx:ty, $expected:expr,) => {
591 interceptor_error_handling_test!(__private read_before_execution, $ctx, $expected,);
592 };
593 ($interceptor:ident, $ctx:ty, $expected:expr) => {
594 interceptor_error_handling_test!(__private $interceptor, $ctx, $expected, _rc: &RuntimeComponents,);
595 };
596 (__private $interceptor:ident, $ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
597 #[derive(Debug)]
598 struct FailingInterceptorA;
599 impl Intercept for FailingInterceptorA {
600 fn name(&self) -> &'static str { "FailingInterceptorA" }
601
602 fn $interceptor(
603 &self,
604 _ctx: $ctx,
605 $($rc_arg)*
606 _cfg: &mut ConfigBag,
607 ) -> Result<(), BoxError> {
608 tracing::debug!("FailingInterceptorA called!");
609 Err("FailingInterceptorA".into())
610 }
611 }
612
613 #[derive(Debug)]
614 struct FailingInterceptorB;
615 impl Intercept for FailingInterceptorB {
616 fn name(&self) -> &'static str { "FailingInterceptorB" }
617
618 fn $interceptor(
619 &self,
620 _ctx: $ctx,
621 $($rc_arg)*
622 _cfg: &mut ConfigBag,
623 ) -> Result<(), BoxError> {
624 tracing::debug!("FailingInterceptorB called!");
625 Err("FailingInterceptorB".into())
626 }
627 }
628
629 #[derive(Debug)]
630 struct FailingInterceptorC;
631 impl Intercept for FailingInterceptorC {
632 fn name(&self) -> &'static str { "FailingInterceptorC" }
633
634 fn $interceptor(
635 &self,
636 _ctx: $ctx,
637 $($rc_arg)*
638 _cfg: &mut ConfigBag,
639 ) -> Result<(), BoxError> {
640 tracing::debug!("FailingInterceptorC called!");
641 Err("FailingInterceptorC".into())
642 }
643 }
644
645 #[derive(Debug)]
646 struct FailingInterceptorsClientRuntimePlugin(RuntimeComponentsBuilder);
647 impl FailingInterceptorsClientRuntimePlugin {
648 fn new() -> Self {
649 Self(RuntimeComponentsBuilder::new("test").with_interceptor(SharedInterceptor::new(FailingInterceptorA)))
650 }
651 }
652 impl RuntimePlugin for FailingInterceptorsClientRuntimePlugin {
653 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
654 Cow::Borrowed(&self.0)
655 }
656 }
657
658 #[derive(Debug)]
659 struct FailingInterceptorsOperationRuntimePlugin(RuntimeComponentsBuilder);
660 impl FailingInterceptorsOperationRuntimePlugin {
661 fn new() -> Self {
662 Self(
663 RuntimeComponentsBuilder::new("test")
664 .with_interceptor(SharedInterceptor::new(FailingInterceptorB))
665 .with_interceptor(SharedInterceptor::new(FailingInterceptorC))
666 )
667 }
668 }
669 impl RuntimePlugin for FailingInterceptorsOperationRuntimePlugin {
670 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
671 Cow::Borrowed(&self.0)
672 }
673 }
674
675 let input = Input::doesnt_matter();
676 let runtime_plugins = RuntimePlugins::new()
677 .with_client_plugin(FailingInterceptorsClientRuntimePlugin::new())
678 .with_operation_plugin(TestOperationRuntimePlugin::new())
679 .with_operation_plugin(NoAuthRuntimePlugin::new())
680 .with_operation_plugin(FailingInterceptorsOperationRuntimePlugin::new());
681 let actual = invoke("test", "test", input, &runtime_plugins)
682 .await
683 .expect_err("should error");
684 let actual = format!("{:?}", actual);
685 assert!(
686 actual.starts_with(&$expected),
687 "\nActual error: {actual}\nShould start with: {}\n",
688 $expected
689 );
690
691 assert!(logs_contain("FailingInterceptorA called!"));
692 assert!(logs_contain("FailingInterceptorB called!"));
693 assert!(logs_contain("FailingInterceptorC called!"));
694 };
695 }
696
697 #[tokio::test]
698 #[traced_test]
699 async fn test_read_before_execution_error_handling() {
700 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeExecution, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
701 interceptor_error_handling_test!(
702 read_before_execution,
703 &BeforeSerializationInterceptorContextRef<'_>,
704 expected,
705 );
706 }
707
708 #[tokio::test]
709 #[traced_test]
710 async fn test_modify_before_serialization_error_handling() {
711 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
712 interceptor_error_handling_test!(
713 modify_before_serialization,
714 &mut BeforeSerializationInterceptorContextMut<'_>,
715 expected
716 );
717 }
718
719 #[tokio::test]
720 #[traced_test]
721 async fn test_read_before_serialization_error_handling() {
722 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
723 interceptor_error_handling_test!(
724 read_before_serialization,
725 &BeforeSerializationInterceptorContextRef<'_>,
726 expected
727 );
728 }
729
730 #[tokio::test]
731 #[traced_test]
732 async fn test_read_after_serialization_error_handling() {
733 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSerialization, interceptor_name: Some("FailingInterceptorC")"#.to_string();
734 interceptor_error_handling_test!(
735 read_after_serialization,
736 &BeforeTransmitInterceptorContextRef<'_>,
737 expected
738 );
739 }
740
741 #[tokio::test]
742 #[traced_test]
743 async fn test_modify_before_retry_loop_error_handling() {
744 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeRetryLoop, interceptor_name: Some("FailingInterceptorC")"#.to_string();
745 interceptor_error_handling_test!(
746 modify_before_retry_loop,
747 &mut BeforeTransmitInterceptorContextMut<'_>,
748 expected
749 );
750 }
751
752 #[tokio::test]
753 #[traced_test]
754 async fn test_read_before_attempt_error_handling() {
755 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeAttempt, interceptor_name: Some("FailingInterceptorC")"#;
756 interceptor_error_handling_test!(
757 read_before_attempt,
758 &BeforeTransmitInterceptorContextRef<'_>,
759 expected
760 );
761 }
762
763 #[tokio::test]
764 #[traced_test]
765 async fn test_modify_before_signing_error_handling() {
766 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
767 interceptor_error_handling_test!(
768 modify_before_signing,
769 &mut BeforeTransmitInterceptorContextMut<'_>,
770 expected
771 );
772 }
773
774 #[tokio::test]
775 #[traced_test]
776 async fn test_read_before_signing_error_handling() {
777 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
778 interceptor_error_handling_test!(
779 read_before_signing,
780 &BeforeTransmitInterceptorContextRef<'_>,
781 expected
782 );
783 }
784
785 #[tokio::test]
786 #[traced_test]
787 async fn test_read_after_signing_error_handling() {
788 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSigning, interceptor_name: Some("FailingInterceptorC")"#;
789 interceptor_error_handling_test!(
790 read_after_signing,
791 &BeforeTransmitInterceptorContextRef<'_>,
792 expected
793 );
794 }
795
796 #[tokio::test]
797 #[traced_test]
798 async fn test_modify_before_transmit_error_handling() {
799 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
800 interceptor_error_handling_test!(
801 modify_before_transmit,
802 &mut BeforeTransmitInterceptorContextMut<'_>,
803 expected
804 );
805 }
806
807 #[tokio::test]
808 #[traced_test]
809 async fn test_read_before_transmit_error_handling() {
810 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
811 interceptor_error_handling_test!(
812 read_before_transmit,
813 &BeforeTransmitInterceptorContextRef<'_>,
814 expected
815 );
816 }
817
818 #[tokio::test]
819 #[traced_test]
820 async fn test_read_after_transmit_error_handling() {
821 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterTransmit, interceptor_name: Some("FailingInterceptorC")"#;
822 interceptor_error_handling_test!(
823 read_after_transmit,
824 &BeforeDeserializationInterceptorContextRef<'_>,
825 expected
826 );
827 }
828
829 #[tokio::test]
830 #[traced_test]
831 async fn test_modify_before_deserialization_error_handling() {
832 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
833 interceptor_error_handling_test!(
834 modify_before_deserialization,
835 &mut BeforeDeserializationInterceptorContextMut<'_>,
836 expected
837 );
838 }
839
840 #[tokio::test]
841 #[traced_test]
842 async fn test_read_before_deserialization_error_handling() {
843 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
844 interceptor_error_handling_test!(
845 read_before_deserialization,
846 &BeforeDeserializationInterceptorContextRef<'_>,
847 expected
848 );
849 }
850
851 #[tokio::test]
852 #[traced_test]
853 async fn test_read_after_deserialization_error_handling() {
854 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
855 interceptor_error_handling_test!(
856 read_after_deserialization,
857 &AfterDeserializationInterceptorContextRef<'_>,
858 expected
859 );
860 }
861
862 #[tokio::test]
863 #[traced_test]
864 async fn test_modify_before_attempt_completion_error_handling() {
865 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("FailingInterceptorC")"#;
866 interceptor_error_handling_test!(
867 modify_before_attempt_completion,
868 &mut FinalizerInterceptorContextMut<'_>,
869 expected
870 );
871 }
872
873 #[tokio::test]
874 #[traced_test]
875 async fn test_read_after_attempt_error_handling() {
876 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("FailingInterceptorC")"#;
877 interceptor_error_handling_test!(
878 read_after_attempt,
879 &FinalizerInterceptorContextRef<'_>,
880 expected
881 );
882 }
883
884 #[tokio::test]
885 #[traced_test]
886 async fn test_modify_before_completion_error_handling() {
887 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("FailingInterceptorC")"#;
888 interceptor_error_handling_test!(
889 modify_before_completion,
890 &mut FinalizerInterceptorContextMut<'_>,
891 expected
892 );
893 }
894
895 #[tokio::test]
896 #[traced_test]
897 async fn test_read_after_execution_error_handling() {
898 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("FailingInterceptorC")"#;
899 interceptor_error_handling_test!(
900 read_after_execution,
901 &FinalizerInterceptorContextRef<'_>,
902 expected
903 );
904 }
905
906 macro_rules! interceptor_error_redirection_test {
907 (read_before_execution, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
908 interceptor_error_redirection_test!(__private read_before_execution, $origin_ctx, $destination_interceptor, $destination_ctx, $expected,);
909 };
910 ($origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
911 interceptor_error_redirection_test!(__private $origin_interceptor, $origin_ctx, $destination_interceptor, $destination_ctx, $expected, _rc: &RuntimeComponents,);
912 };
913 (__private $origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
914 #[derive(Debug)]
915 struct OriginInterceptor;
916 impl Intercept for OriginInterceptor {
917 fn name(&self) -> &'static str { "OriginInterceptor" }
918
919 fn $origin_interceptor(
920 &self,
921 _ctx: $origin_ctx,
922 $($rc_arg)*
923 _cfg: &mut ConfigBag,
924 ) -> Result<(), BoxError> {
925 tracing::debug!("OriginInterceptor called!");
926 Err("OriginInterceptor".into())
927 }
928 }
929
930 #[derive(Debug)]
931 struct DestinationInterceptor;
932 impl Intercept for DestinationInterceptor {
933 fn name(&self) -> &'static str { "DestinationInterceptor" }
934
935 fn $destination_interceptor(
936 &self,
937 _ctx: $destination_ctx,
938 _runtime_components: &RuntimeComponents,
939 _cfg: &mut ConfigBag,
940 ) -> Result<(), BoxError> {
941 tracing::debug!("DestinationInterceptor called!");
942 Err("DestinationInterceptor".into())
943 }
944 }
945
946 #[derive(Debug)]
947 struct InterceptorsTestOperationRuntimePlugin(RuntimeComponentsBuilder);
948 impl InterceptorsTestOperationRuntimePlugin {
949 fn new() -> Self {
950 Self(
951 RuntimeComponentsBuilder::new("test")
952 .with_interceptor(SharedInterceptor::new(OriginInterceptor))
953 .with_interceptor(SharedInterceptor::new(DestinationInterceptor))
954 )
955 }
956 }
957 impl RuntimePlugin for InterceptorsTestOperationRuntimePlugin {
958 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
959 Cow::Borrowed(&self.0)
960 }
961 }
962
963 let input = Input::doesnt_matter();
964 let runtime_plugins = RuntimePlugins::new()
965 .with_operation_plugin(TestOperationRuntimePlugin::new())
966 .with_operation_plugin(NoAuthRuntimePlugin::new())
967 .with_operation_plugin(InterceptorsTestOperationRuntimePlugin::new());
968 let actual = invoke("test", "test", input, &runtime_plugins)
969 .await
970 .expect_err("should error");
971 let actual = format!("{:?}", actual);
972 assert!(
973 actual.starts_with(&$expected),
974 "\nActual error: {actual}\nShould start with: {}\n",
975 $expected
976 );
977
978 assert!(logs_contain("OriginInterceptor called!"));
979 assert!(logs_contain("DestinationInterceptor called!"));
980 };
981 }
982
983 #[tokio::test]
984 #[traced_test]
985 async fn test_read_before_execution_error_causes_jump_to_modify_before_completion() {
986 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
987 interceptor_error_redirection_test!(
988 read_before_execution,
989 &BeforeSerializationInterceptorContextRef<'_>,
990 modify_before_completion,
991 &mut FinalizerInterceptorContextMut<'_>,
992 expected
993 );
994 }
995
996 #[tokio::test]
997 #[traced_test]
998 async fn test_modify_before_serialization_error_causes_jump_to_modify_before_completion() {
999 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1000 interceptor_error_redirection_test!(
1001 modify_before_serialization,
1002 &mut BeforeSerializationInterceptorContextMut<'_>,
1003 modify_before_completion,
1004 &mut FinalizerInterceptorContextMut<'_>,
1005 expected
1006 );
1007 }
1008
1009 #[tokio::test]
1010 #[traced_test]
1011 async fn test_read_before_serialization_error_causes_jump_to_modify_before_completion() {
1012 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1013 interceptor_error_redirection_test!(
1014 read_before_serialization,
1015 &BeforeSerializationInterceptorContextRef<'_>,
1016 modify_before_completion,
1017 &mut FinalizerInterceptorContextMut<'_>,
1018 expected
1019 );
1020 }
1021
1022 #[tokio::test]
1023 #[traced_test]
1024 async fn test_read_after_serialization_error_causes_jump_to_modify_before_completion() {
1025 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1026 interceptor_error_redirection_test!(
1027 read_after_serialization,
1028 &BeforeTransmitInterceptorContextRef<'_>,
1029 modify_before_completion,
1030 &mut FinalizerInterceptorContextMut<'_>,
1031 expected
1032 );
1033 }
1034
1035 #[tokio::test]
1036 #[traced_test]
1037 async fn test_modify_before_retry_loop_error_causes_jump_to_modify_before_completion() {
1038 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1039 interceptor_error_redirection_test!(
1040 modify_before_retry_loop,
1041 &mut BeforeTransmitInterceptorContextMut<'_>,
1042 modify_before_completion,
1043 &mut FinalizerInterceptorContextMut<'_>,
1044 expected
1045 );
1046 }
1047
1048 #[tokio::test]
1049 #[traced_test]
1050 async fn test_read_before_attempt_error_causes_jump_to_modify_before_attempt_completion() {
1051 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1052 interceptor_error_redirection_test!(
1053 read_before_attempt,
1054 &BeforeTransmitInterceptorContextRef<'_>,
1055 modify_before_attempt_completion,
1056 &mut FinalizerInterceptorContextMut<'_>,
1057 expected
1058 );
1059 }
1060
1061 #[tokio::test]
1062 #[traced_test]
1063 async fn test_modify_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1064 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1065 interceptor_error_redirection_test!(
1066 modify_before_signing,
1067 &mut BeforeTransmitInterceptorContextMut<'_>,
1068 modify_before_attempt_completion,
1069 &mut FinalizerInterceptorContextMut<'_>,
1070 expected
1071 );
1072 }
1073
1074 #[tokio::test]
1075 #[traced_test]
1076 async fn test_read_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1077 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1078 interceptor_error_redirection_test!(
1079 read_before_signing,
1080 &BeforeTransmitInterceptorContextRef<'_>,
1081 modify_before_attempt_completion,
1082 &mut FinalizerInterceptorContextMut<'_>,
1083 expected
1084 );
1085 }
1086
1087 #[tokio::test]
1088 #[traced_test]
1089 async fn test_read_after_signing_error_causes_jump_to_modify_before_attempt_completion() {
1090 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1091 interceptor_error_redirection_test!(
1092 read_after_signing,
1093 &BeforeTransmitInterceptorContextRef<'_>,
1094 modify_before_attempt_completion,
1095 &mut FinalizerInterceptorContextMut<'_>,
1096 expected
1097 );
1098 }
1099
1100 #[tokio::test]
1101 #[traced_test]
1102 async fn test_modify_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1103 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1104 interceptor_error_redirection_test!(
1105 modify_before_transmit,
1106 &mut BeforeTransmitInterceptorContextMut<'_>,
1107 modify_before_attempt_completion,
1108 &mut FinalizerInterceptorContextMut<'_>,
1109 expected
1110 );
1111 }
1112
1113 #[tokio::test]
1114 #[traced_test]
1115 async fn test_read_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1116 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1117 interceptor_error_redirection_test!(
1118 read_before_transmit,
1119 &BeforeTransmitInterceptorContextRef<'_>,
1120 modify_before_attempt_completion,
1121 &mut FinalizerInterceptorContextMut<'_>,
1122 expected
1123 );
1124 }
1125
1126 #[tokio::test]
1127 #[traced_test]
1128 async fn test_read_after_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1129 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1130 interceptor_error_redirection_test!(
1131 read_after_transmit,
1132 &BeforeDeserializationInterceptorContextRef<'_>,
1133 modify_before_attempt_completion,
1134 &mut FinalizerInterceptorContextMut<'_>,
1135 expected
1136 );
1137 }
1138
1139 #[tokio::test]
1140 #[traced_test]
1141 async fn test_modify_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1142 ) {
1143 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1144 interceptor_error_redirection_test!(
1145 modify_before_deserialization,
1146 &mut BeforeDeserializationInterceptorContextMut<'_>,
1147 modify_before_attempt_completion,
1148 &mut FinalizerInterceptorContextMut<'_>,
1149 expected
1150 );
1151 }
1152
1153 #[tokio::test]
1154 #[traced_test]
1155 async fn test_read_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1156 ) {
1157 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1158 interceptor_error_redirection_test!(
1159 read_before_deserialization,
1160 &BeforeDeserializationInterceptorContextRef<'_>,
1161 modify_before_attempt_completion,
1162 &mut FinalizerInterceptorContextMut<'_>,
1163 expected
1164 );
1165 }
1166
1167 #[tokio::test]
1168 #[traced_test]
1169 async fn test_read_after_deserialization_error_causes_jump_to_modify_before_attempt_completion()
1170 {
1171 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1172 interceptor_error_redirection_test!(
1173 read_after_deserialization,
1174 &AfterDeserializationInterceptorContextRef<'_>,
1175 modify_before_attempt_completion,
1176 &mut FinalizerInterceptorContextMut<'_>,
1177 expected
1178 );
1179 }
1180
1181 #[tokio::test]
1182 #[traced_test]
1183 async fn test_modify_before_attempt_completion_error_causes_jump_to_read_after_attempt() {
1184 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("DestinationInterceptor")"#;
1185 interceptor_error_redirection_test!(
1186 modify_before_attempt_completion,
1187 &mut FinalizerInterceptorContextMut<'_>,
1188 read_after_attempt,
1189 &FinalizerInterceptorContextRef<'_>,
1190 expected
1191 );
1192 }
1193
1194 #[tokio::test]
1195 #[traced_test]
1196 async fn test_modify_before_completion_error_causes_jump_to_read_after_execution() {
1197 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("DestinationInterceptor")"#;
1198 interceptor_error_redirection_test!(
1199 modify_before_completion,
1200 &mut FinalizerInterceptorContextMut<'_>,
1201 read_after_execution,
1202 &FinalizerInterceptorContextRef<'_>,
1203 expected
1204 );
1205 }
1206
1207 #[tokio::test]
1208 async fn test_stop_points() {
1209 let runtime_plugins = || {
1210 RuntimePlugins::new()
1211 .with_operation_plugin(TestOperationRuntimePlugin::new())
1212 .with_operation_plugin(NoAuthRuntimePlugin::new())
1213 };
1214
1215 let context = invoke_with_stop_point(
1217 "test",
1218 "test",
1219 Input::doesnt_matter(),
1220 &runtime_plugins(),
1221 StopPoint::None,
1222 )
1223 .await
1224 .expect("success");
1225 assert!(context.response().is_some());
1226
1227 let context = invoke_with_stop_point(
1229 "test",
1230 "test",
1231 Input::doesnt_matter(),
1232 &runtime_plugins(),
1233 StopPoint::BeforeTransmit,
1234 )
1235 .await
1236 .expect("success");
1237 assert!(context.response().is_none());
1238 }
1239
1240 #[tokio::test]
1242 async fn test_stop_points_error_handling() {
1243 #[derive(Debug, Default)]
1244 struct Inner {
1245 modify_before_retry_loop_called: AtomicBool,
1246 modify_before_completion_called: AtomicBool,
1247 read_after_execution_called: AtomicBool,
1248 }
1249 #[derive(Clone, Debug, Default)]
1250 struct TestInterceptor {
1251 inner: Arc<Inner>,
1252 }
1253
1254 impl Intercept for TestInterceptor {
1255 fn name(&self) -> &'static str {
1256 "TestInterceptor"
1257 }
1258
1259 fn modify_before_retry_loop(
1260 &self,
1261 _context: &mut BeforeTransmitInterceptorContextMut<'_>,
1262 _rc: &RuntimeComponents,
1263 _cfg: &mut ConfigBag,
1264 ) -> Result<(), BoxError> {
1265 self.inner
1266 .modify_before_retry_loop_called
1267 .store(true, Ordering::Relaxed);
1268 Err("test error".into())
1269 }
1270
1271 fn modify_before_completion(
1272 &self,
1273 _context: &mut FinalizerInterceptorContextMut<'_>,
1274 _rc: &RuntimeComponents,
1275 _cfg: &mut ConfigBag,
1276 ) -> Result<(), BoxError> {
1277 self.inner
1278 .modify_before_completion_called
1279 .store(true, Ordering::Relaxed);
1280 Ok(())
1281 }
1282
1283 fn read_after_execution(
1284 &self,
1285 _context: &FinalizerInterceptorContextRef<'_>,
1286 _rc: &RuntimeComponents,
1287 _cfg: &mut ConfigBag,
1288 ) -> Result<(), BoxError> {
1289 self.inner
1290 .read_after_execution_called
1291 .store(true, Ordering::Relaxed);
1292 Ok(())
1293 }
1294 }
1295
1296 #[derive(Debug)]
1297 struct TestInterceptorRuntimePlugin {
1298 builder: RuntimeComponentsBuilder,
1299 }
1300
1301 impl RuntimePlugin for TestInterceptorRuntimePlugin {
1302 fn runtime_components(
1303 &self,
1304 _: &RuntimeComponentsBuilder,
1305 ) -> Cow<'_, RuntimeComponentsBuilder> {
1306 Cow::Borrowed(&self.builder)
1307 }
1308 }
1309
1310 let interceptor = TestInterceptor::default();
1311 let client = NeverClient::new();
1312 let runtime_plugins = || {
1313 RuntimePlugins::new()
1314 .with_operation_plugin(TestOperationRuntimePlugin::new())
1315 .with_operation_plugin(NoAuthRuntimePlugin::new())
1316 .with_operation_plugin(TestInterceptorRuntimePlugin {
1317 builder: RuntimeComponentsBuilder::new("test")
1318 .with_interceptor(SharedInterceptor::new(interceptor.clone()))
1319 .with_http_client(Some(client.clone())),
1320 })
1321 };
1322
1323 let _err = invoke_with_stop_point(
1325 "test",
1326 "test",
1327 Input::doesnt_matter(),
1328 &runtime_plugins(),
1329 StopPoint::BeforeTransmit,
1330 )
1331 .await
1332 .expect_err("an error was returned");
1333 assert_eq!(client.num_calls(), 0);
1334
1335 assert!(interceptor
1336 .inner
1337 .modify_before_retry_loop_called
1338 .load(Ordering::Relaxed));
1339 assert!(interceptor
1340 .inner
1341 .modify_before_completion_called
1342 .load(Ordering::Relaxed));
1343 assert!(interceptor
1344 .inner
1345 .read_after_execution_called
1346 .load(Ordering::Relaxed));
1347 }
1348}