tower_http/trace/
mod.rs

1//! Middleware that adds high level [tracing] to a [`Service`].
2//!
3//! # Example
4//!
5//! Adding tracing to your service can be as simple as:
6//!
7//! ```rust
8//! use http::{Request, Response};
9//! use tower::{ServiceBuilder, ServiceExt, Service};
10//! use tower_http::trace::TraceLayer;
11//! use std::convert::Infallible;
12//! use http_body_util::Full;
13//! use bytes::Bytes;
14//!
15//! async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
16//!     Ok(Response::new(Full::default()))
17//! }
18//!
19//! # #[tokio::main]
20//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//! // Setup tracing
22//! tracing_subscriber::fmt::init();
23//!
24//! let mut service = ServiceBuilder::new()
25//!     .layer(TraceLayer::new_for_http())
26//!     .service_fn(handle);
27//!
28//! let request = Request::new(Full::from("foo"));
29//!
30//! let response = service
31//!     .ready()
32//!     .await?
33//!     .call(request)
34//!     .await?;
35//! # Ok(())
36//! # }
37//! ```
38//!
39//! If you run this application with `RUST_LOG=tower_http=trace cargo run` you should see logs like:
40//!
41//! ```text
42//! Mar 05 20:50:28.523 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_request: started processing request
43//! Mar 05 20:50:28.524 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_response: finished processing request latency=1 ms status=200
44//! ```
45//!
46//! # Customization
47//!
48//! [`Trace`] comes with good defaults but also supports customizing many aspects of the output.
49//!
50//! The default behaviour supports some customization:
51//!
52//! ```rust
53//! use http::{Request, Response, HeaderMap, StatusCode};
54//! use http_body_util::Full;
55//! use bytes::Bytes;
56//! use tower::ServiceBuilder;
57//! use tracing::Level;
58//! use tower_http::{
59//!     LatencyUnit,
60//!     trace::{TraceLayer, DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse},
61//! };
62//! use std::time::Duration;
63//! # use tower::{ServiceExt, Service};
64//! # use std::convert::Infallible;
65//!
66//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
67//! #     Ok(Response::new(Full::from("foo")))
68//! # }
69//! # #[tokio::main]
70//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
71//! # tracing_subscriber::fmt::init();
72//! #
73//! let service = ServiceBuilder::new()
74//!     .layer(
75//!         TraceLayer::new_for_http()
76//!             .make_span_with(
77//!                 DefaultMakeSpan::new().include_headers(true)
78//!             )
79//!             .on_request(
80//!                 DefaultOnRequest::new().level(Level::INFO)
81//!             )
82//!             .on_response(
83//!                 DefaultOnResponse::new()
84//!                     .level(Level::INFO)
85//!                     .latency_unit(LatencyUnit::Micros)
86//!             )
87//!             // on so on for `on_eos`, `on_body_chunk`, and `on_failure`
88//!     )
89//!     .service_fn(handle);
90//! # let mut service = service;
91//! # let response = service
92//! #     .ready()
93//! #     .await?
94//! #     .call(Request::new(Full::from("foo")))
95//! #     .await?;
96//! # Ok(())
97//! # }
98//! ```
99//!
100//! However for maximum control you can provide callbacks:
101//!
102//! ```rust
103//! use http::{Request, Response, HeaderMap, StatusCode};
104//! use http_body_util::Full;
105//! use bytes::Bytes;
106//! use tower::ServiceBuilder;
107//! use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
108//! use std::time::Duration;
109//! use tracing::Span;
110//! # use tower::{ServiceExt, Service};
111//! # use std::convert::Infallible;
112//!
113//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
114//! #     Ok(Response::new(Full::from("foo")))
115//! # }
116//! # #[tokio::main]
117//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
118//! # tracing_subscriber::fmt::init();
119//! #
120//! let service = ServiceBuilder::new()
121//!     .layer(
122//!         TraceLayer::new_for_http()
123//!             .make_span_with(|request: &Request<Full<Bytes>>| {
124//!                 tracing::debug_span!("http-request")
125//!             })
126//!             .on_request(|request: &Request<Full<Bytes>>, _span: &Span| {
127//!                 tracing::debug!("started {} {}", request.method(), request.uri().path())
128//!             })
129//!             .on_response(|response: &Response<Full<Bytes>>, latency: Duration, _span: &Span| {
130//!                 tracing::debug!("response generated in {:?}", latency)
131//!             })
132//!             .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
133//!                 tracing::debug!("sending {} bytes", chunk.len())
134//!             })
135//!             .on_eos(|trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
136//!                 tracing::debug!("stream closed after {:?}", stream_duration)
137//!             })
138//!             .on_failure(|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
139//!                 tracing::debug!("something went wrong")
140//!             })
141//!     )
142//!     .service_fn(handle);
143//! # let mut service = service;
144//! # let response = service
145//! #     .ready()
146//! #     .await?
147//! #     .call(Request::new(Full::from("foo")))
148//! #     .await?;
149//! # Ok(())
150//! # }
151//! ```
152//!
153//! ## Disabling something
154//!
155//! Setting the behaviour to `()` will be disable that particular step:
156//!
157//! ```rust
158//! use http::StatusCode;
159//! use tower::ServiceBuilder;
160//! use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
161//! use std::time::Duration;
162//! use tracing::Span;
163//! # use tower::{ServiceExt, Service};
164//! # use http_body_util::Full;
165//! # use bytes::Bytes;
166//! # use http::{Response, Request};
167//! # use std::convert::Infallible;
168//!
169//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
170//! #     Ok(Response::new(Full::from("foo")))
171//! # }
172//! # #[tokio::main]
173//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
174//! # tracing_subscriber::fmt::init();
175//! #
176//! let service = ServiceBuilder::new()
177//!     .layer(
178//!         // This configuration will only emit events on failures
179//!         TraceLayer::new_for_http()
180//!             .on_request(())
181//!             .on_response(())
182//!             .on_body_chunk(())
183//!             .on_eos(())
184//!             .on_failure(|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
185//!                 tracing::debug!("something went wrong")
186//!             })
187//!     )
188//!     .service_fn(handle);
189//! # let mut service = service;
190//! # let response = service
191//! #     .ready()
192//! #     .await?
193//! #     .call(Request::new(Full::from("foo")))
194//! #     .await?;
195//! # Ok(())
196//! # }
197//! ```
198//!
199//! # When the callbacks are called
200//!
201//! ### `on_request`
202//!
203//! The `on_request` callback is called when the request arrives at the
204//! middleware in [`Service::call`] just prior to passing the request to the
205//! inner service.
206//!
207//! ### `on_response`
208//!
209//! The `on_response` callback is called when the inner service's response
210//! future completes with `Ok(response)` regardless if the response is
211//! classified as a success or a failure.
212//!
213//! For example if you're using [`ServerErrorsAsFailures`] as your classifier
214//! and the inner service responds with `500 Internal Server Error` then the
215//! `on_response` callback is still called. `on_failure` would _also_ be called
216//! in this case since the response was classified as a failure.
217//!
218//! ### `on_body_chunk`
219//!
220//! The `on_body_chunk` callback is called when the response body produces a new
221//! chunk, that is when [`Body::poll_frame`] returns a data frame.
222//!
223//! `on_body_chunk` is called even if the chunk is empty.
224//!
225//! ### `on_eos`
226//!
227//! The `on_eos` callback is called when a streaming response body ends, that is
228//! when [`Body::poll_frame`] returns a trailers frame.
229//!
230//! `on_eos` is called even if the trailers produced are `None`.
231//!
232//! ### `on_failure`
233//!
234//! The `on_failure` callback is called when:
235//!
236//! - The inner [`Service`]'s response future resolves to an error.
237//! - A response is classified as a failure.
238//! - [`Body::poll_frame`] returns an error.
239//! - An end-of-stream is classified as a failure.
240//!
241//! # Recording fields on the span
242//!
243//! All callbacks receive a reference to the [tracing] [`Span`], corresponding to this request,
244//! produced by the closure passed to [`TraceLayer::make_span_with`]. It can be used to [record
245//! field values][record] that weren't known when the span was created.
246//!
247//! ```rust
248//! use http::{Request, Response, HeaderMap, StatusCode};
249//! use http_body_util::Full;
250//! use bytes::Bytes;
251//! use tower::ServiceBuilder;
252//! use tower_http::trace::TraceLayer;
253//! use tracing::Span;
254//! use std::time::Duration;
255//! # use std::convert::Infallible;
256//!
257//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
258//! #     Ok(Response::new(Full::from("foo")))
259//! # }
260//! # #[tokio::main]
261//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
262//! # tracing_subscriber::fmt::init();
263//! #
264//! let service = ServiceBuilder::new()
265//!     .layer(
266//!         TraceLayer::new_for_http()
267//!             .make_span_with(|request: &Request<Full<Bytes>>| {
268//!                 tracing::debug_span!(
269//!                     "http-request",
270//!                     status_code = tracing::field::Empty,
271//!                 )
272//!             })
273//!             .on_response(|response: &Response<Full<Bytes>>, _latency: Duration, span: &Span| {
274//!                 span.record("status_code", &tracing::field::display(response.status()));
275//!
276//!                 tracing::debug!("response generated")
277//!             })
278//!     )
279//!     .service_fn(handle);
280//! # Ok(())
281//! # }
282//! ```
283//!
284//! # Providing classifiers
285//!
286//! Tracing requires determining if a response is a success or failure. [`MakeClassifier`] is used
287//! to create a classifier for the incoming request. See the docs for [`MakeClassifier`] and
288//! [`ClassifyResponse`] for more details on classification.
289//!
290//! A [`MakeClassifier`] can be provided when creating a [`TraceLayer`]:
291//!
292//! ```rust
293//! use http::{Request, Response};
294//! use http_body_util::Full;
295//! use bytes::Bytes;
296//! use tower::ServiceBuilder;
297//! use tower_http::{
298//!     trace::TraceLayer,
299//!     classify::{
300//!         MakeClassifier, ClassifyResponse, ClassifiedResponse, NeverClassifyEos,
301//!         SharedClassifier,
302//!     },
303//! };
304//! use std::convert::Infallible;
305//!
306//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
307//! #     Ok(Response::new(Full::from("foo")))
308//! # }
309//! # #[tokio::main]
310//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
311//! # tracing_subscriber::fmt::init();
312//! #
313//! // Our `MakeClassifier` that always crates `MyClassifier` classifiers.
314//! #[derive(Copy, Clone)]
315//! struct MyMakeClassify;
316//!
317//! impl MakeClassifier for MyMakeClassify {
318//!     type Classifier = MyClassifier;
319//!     type FailureClass = &'static str;
320//!     type ClassifyEos = NeverClassifyEos<&'static str>;
321//!
322//!     fn make_classifier<B>(&self, req: &Request<B>) -> Self::Classifier {
323//!         MyClassifier
324//!     }
325//! }
326//!
327//! // A classifier that classifies failures as `"something went wrong..."`.
328//! #[derive(Copy, Clone)]
329//! struct MyClassifier;
330//!
331//! impl ClassifyResponse for MyClassifier {
332//!     type FailureClass = &'static str;
333//!     type ClassifyEos = NeverClassifyEos<&'static str>;
334//!
335//!     fn classify_response<B>(
336//!         self,
337//!         res: &Response<B>
338//!     ) -> ClassifiedResponse<Self::FailureClass, Self::ClassifyEos> {
339//!         // Classify based on the status code.
340//!         if res.status().is_server_error() {
341//!             ClassifiedResponse::Ready(Err("something went wrong..."))
342//!         } else {
343//!             ClassifiedResponse::Ready(Ok(()))
344//!         }
345//!     }
346//!
347//!     fn classify_error<E>(self, error: &E) -> Self::FailureClass
348//!     where
349//!         E: std::fmt::Display + 'static,
350//!     {
351//!         "something went wrong..."
352//!     }
353//! }
354//!
355//! let service = ServiceBuilder::new()
356//!     // Create a trace layer that uses our classifier.
357//!     .layer(TraceLayer::new(MyMakeClassify))
358//!     .service_fn(handle);
359//!
360//! // Since `MyClassifier` is `Clone` we can also use `SharedClassifier`
361//! // to avoid having to define a separate `MakeClassifier`.
362//! let service = ServiceBuilder::new()
363//!     .layer(TraceLayer::new(SharedClassifier::new(MyClassifier)))
364//!     .service_fn(handle);
365//! # Ok(())
366//! # }
367//! ```
368//!
369//! [`TraceLayer`] comes with convenience methods for using common classifiers:
370//!
371//! - [`TraceLayer::new_for_http`] classifies based on the status code. It doesn't consider
372//! streaming responses.
373//! - [`TraceLayer::new_for_grpc`] classifies based on the gRPC protocol and supports streaming
374//! responses.
375//!
376//! [tracing]: https://crates.io/crates/tracing
377//! [`Service`]: tower_service::Service
378//! [`Service::call`]: tower_service::Service::call
379//! [`MakeClassifier`]: crate::classify::MakeClassifier
380//! [`ClassifyResponse`]: crate::classify::ClassifyResponse
381//! [record]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.record
382//! [`TraceLayer::make_span_with`]: crate::trace::TraceLayer::make_span_with
383//! [`Span`]: tracing::Span
384//! [`ServerErrorsAsFailures`]: crate::classify::ServerErrorsAsFailures
385//! [`Body::poll_frame`]: http_body::Body::poll_frame
386
387use std::{fmt, time::Duration};
388
389use tracing::Level;
390
391pub use self::{
392    body::ResponseBody,
393    future::ResponseFuture,
394    layer::TraceLayer,
395    make_span::{DefaultMakeSpan, MakeSpan},
396    on_body_chunk::{DefaultOnBodyChunk, OnBodyChunk},
397    on_eos::{DefaultOnEos, OnEos},
398    on_failure::{DefaultOnFailure, OnFailure},
399    on_request::{DefaultOnRequest, OnRequest},
400    on_response::{DefaultOnResponse, OnResponse},
401    service::Trace,
402};
403use crate::{
404    classify::{GrpcErrorsAsFailures, ServerErrorsAsFailures, SharedClassifier},
405    LatencyUnit,
406};
407
408/// MakeClassifier for HTTP requests.
409pub type HttpMakeClassifier = SharedClassifier<ServerErrorsAsFailures>;
410
411/// MakeClassifier for gRPC requests.
412pub type GrpcMakeClassifier = SharedClassifier<GrpcErrorsAsFailures>;
413
414macro_rules! event_dynamic_lvl {
415    ( $(target: $target:expr,)? $(parent: $parent:expr,)? $lvl:expr, $($tt:tt)* ) => {
416        match $lvl {
417            tracing::Level::ERROR => {
418                tracing::event!(
419                    $(target: $target,)?
420                    $(parent: $parent,)?
421                    tracing::Level::ERROR,
422                    $($tt)*
423                );
424            }
425            tracing::Level::WARN => {
426                tracing::event!(
427                    $(target: $target,)?
428                    $(parent: $parent,)?
429                    tracing::Level::WARN,
430                    $($tt)*
431                );
432            }
433            tracing::Level::INFO => {
434                tracing::event!(
435                    $(target: $target,)?
436                    $(parent: $parent,)?
437                    tracing::Level::INFO,
438                    $($tt)*
439                );
440            }
441            tracing::Level::DEBUG => {
442                tracing::event!(
443                    $(target: $target,)?
444                    $(parent: $parent,)?
445                    tracing::Level::DEBUG,
446                    $($tt)*
447                );
448            }
449            tracing::Level::TRACE => {
450                tracing::event!(
451                    $(target: $target,)?
452                    $(parent: $parent,)?
453                    tracing::Level::TRACE,
454                    $($tt)*
455                );
456            }
457        }
458    };
459}
460
461mod body;
462mod future;
463mod layer;
464mod make_span;
465mod on_body_chunk;
466mod on_eos;
467mod on_failure;
468mod on_request;
469mod on_response;
470mod service;
471
472const DEFAULT_MESSAGE_LEVEL: Level = Level::DEBUG;
473const DEFAULT_ERROR_LEVEL: Level = Level::ERROR;
474
475struct Latency {
476    unit: LatencyUnit,
477    duration: Duration,
478}
479
480impl fmt::Display for Latency {
481    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482        match self.unit {
483            LatencyUnit::Seconds => write!(f, "{} s", self.duration.as_secs_f64()),
484            LatencyUnit::Millis => write!(f, "{} ms", self.duration.as_millis()),
485            LatencyUnit::Micros => write!(f, "{} μs", self.duration.as_micros()),
486            LatencyUnit::Nanos => write!(f, "{} ns", self.duration.as_nanos()),
487        }
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494    use crate::classify::ServerErrorsFailureClass;
495    use crate::test_helpers::Body;
496    use bytes::Bytes;
497    use http::{HeaderMap, Request, Response};
498    use once_cell::sync::Lazy;
499    use std::{
500        sync::atomic::{AtomicU32, Ordering},
501        time::Duration,
502    };
503    use tower::{BoxError, Service, ServiceBuilder, ServiceExt};
504    use tracing::Span;
505
506    #[tokio::test]
507    async fn unary_request() {
508        static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
509        static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
510        static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
511        static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
512        static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
513
514        let trace_layer = TraceLayer::new_for_http()
515            .make_span_with(|_req: &Request<Body>| {
516                tracing::info_span!("test-span", foo = tracing::field::Empty)
517            })
518            .on_request(|_req: &Request<Body>, span: &Span| {
519                span.record("foo", &42);
520                ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
521            })
522            .on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
523                ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
524            })
525            .on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
526                ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
527            })
528            .on_eos(
529                |_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
530                    ON_EOS.fetch_add(1, Ordering::SeqCst);
531                },
532            )
533            .on_failure(
534                |_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
535                    ON_FAILURE.fetch_add(1, Ordering::SeqCst);
536                },
537            );
538
539        let mut svc = ServiceBuilder::new().layer(trace_layer).service_fn(echo);
540
541        let res = svc
542            .ready()
543            .await
544            .unwrap()
545            .call(Request::new(Body::from("foobar")))
546            .await
547            .unwrap();
548
549        assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
550        assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
551        assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
552        assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
553        assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
554
555        crate::test_helpers::to_bytes(res.into_body())
556            .await
557            .unwrap();
558        assert_eq!(1, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
559        assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
560        assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
561    }
562
563    #[tokio::test]
564    async fn streaming_response() {
565        static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
566        static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
567        static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
568        static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
569        static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
570
571        let trace_layer = TraceLayer::new_for_http()
572            .on_request(|_req: &Request<Body>, _span: &Span| {
573                ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
574            })
575            .on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
576                ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
577            })
578            .on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
579                ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
580            })
581            .on_eos(
582                |_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
583                    ON_EOS.fetch_add(1, Ordering::SeqCst);
584                },
585            )
586            .on_failure(
587                |_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
588                    ON_FAILURE.fetch_add(1, Ordering::SeqCst);
589                },
590            );
591
592        let mut svc = ServiceBuilder::new()
593            .layer(trace_layer)
594            .service_fn(streaming_body);
595
596        let res = svc
597            .ready()
598            .await
599            .unwrap()
600            .call(Request::new(Body::empty()))
601            .await
602            .unwrap();
603
604        assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
605        assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
606        assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
607        assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
608        assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
609
610        crate::test_helpers::to_bytes(res.into_body())
611            .await
612            .unwrap();
613        assert_eq!(3, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
614        assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
615        assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
616    }
617
618    async fn echo(req: Request<Body>) -> Result<Response<Body>, BoxError> {
619        Ok(Response::new(req.into_body()))
620    }
621
622    async fn streaming_body(_req: Request<Body>) -> Result<Response<Body>, BoxError> {
623        use futures_util::stream::iter;
624
625        let stream = iter(vec![
626            Ok::<_, BoxError>(Bytes::from("one")),
627            Ok::<_, BoxError>(Bytes::from("two")),
628            Ok::<_, BoxError>(Bytes::from("three")),
629        ]);
630
631        let body = Body::from_stream(stream);
632
633        Ok(Response::new(body))
634    }
635}