tower_http/trace/mod.rs
1//! Middleware that adds high level [tracing] to a [`Service`].
2//!
3//! # Example
4//!
5//! Adding tracing to your service can be as simple as:
6//!
7//! ```rust
8//! use http::{Request, Response};
9//! use tower::{ServiceBuilder, ServiceExt, Service};
10//! use tower_http::trace::TraceLayer;
11//! use std::convert::Infallible;
12//! use http_body_util::Full;
13//! use bytes::Bytes;
14//!
15//! async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
16//! Ok(Response::new(Full::default()))
17//! }
18//!
19//! # #[tokio::main]
20//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//! // Setup tracing
22//! tracing_subscriber::fmt::init();
23//!
24//! let mut service = ServiceBuilder::new()
25//! .layer(TraceLayer::new_for_http())
26//! .service_fn(handle);
27//!
28//! let request = Request::new(Full::from("foo"));
29//!
30//! let response = service
31//! .ready()
32//! .await?
33//! .call(request)
34//! .await?;
35//! # Ok(())
36//! # }
37//! ```
38//!
39//! If you run this application with `RUST_LOG=tower_http=trace cargo run` you should see logs like:
40//!
41//! ```text
42//! Mar 05 20:50:28.523 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_request: started processing request
43//! Mar 05 20:50:28.524 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_response: finished processing request latency=1 ms status=200
44//! ```
45//!
46//! # Customization
47//!
48//! [`Trace`] comes with good defaults but also supports customizing many aspects of the output.
49//!
50//! The default behaviour supports some customization:
51//!
52//! ```rust
53//! use http::{Request, Response, HeaderMap, StatusCode};
54//! use http_body_util::Full;
55//! use bytes::Bytes;
56//! use tower::ServiceBuilder;
57//! use tracing::Level;
58//! use tower_http::{
59//! LatencyUnit,
60//! trace::{TraceLayer, DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse},
61//! };
62//! use std::time::Duration;
63//! # use tower::{ServiceExt, Service};
64//! # use std::convert::Infallible;
65//!
66//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
67//! # Ok(Response::new(Full::from("foo")))
68//! # }
69//! # #[tokio::main]
70//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
71//! # tracing_subscriber::fmt::init();
72//! #
73//! let service = ServiceBuilder::new()
74//! .layer(
75//! TraceLayer::new_for_http()
76//! .make_span_with(
77//! DefaultMakeSpan::new().include_headers(true)
78//! )
79//! .on_request(
80//! DefaultOnRequest::new().level(Level::INFO)
81//! )
82//! .on_response(
83//! DefaultOnResponse::new()
84//! .level(Level::INFO)
85//! .latency_unit(LatencyUnit::Micros)
86//! )
87//! // on so on for `on_eos`, `on_body_chunk`, and `on_failure`
88//! )
89//! .service_fn(handle);
90//! # let mut service = service;
91//! # let response = service
92//! # .ready()
93//! # .await?
94//! # .call(Request::new(Full::from("foo")))
95//! # .await?;
96//! # Ok(())
97//! # }
98//! ```
99//!
100//! However for maximum control you can provide callbacks:
101//!
102//! ```rust
103//! use http::{Request, Response, HeaderMap, StatusCode};
104//! use http_body_util::Full;
105//! use bytes::Bytes;
106//! use tower::ServiceBuilder;
107//! use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
108//! use std::time::Duration;
109//! use tracing::Span;
110//! # use tower::{ServiceExt, Service};
111//! # use std::convert::Infallible;
112//!
113//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
114//! # Ok(Response::new(Full::from("foo")))
115//! # }
116//! # #[tokio::main]
117//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
118//! # tracing_subscriber::fmt::init();
119//! #
120//! let service = ServiceBuilder::new()
121//! .layer(
122//! TraceLayer::new_for_http()
123//! .make_span_with(|request: &Request<Full<Bytes>>| {
124//! tracing::debug_span!("http-request")
125//! })
126//! .on_request(|request: &Request<Full<Bytes>>, _span: &Span| {
127//! tracing::debug!("started {} {}", request.method(), request.uri().path())
128//! })
129//! .on_response(|response: &Response<Full<Bytes>>, latency: Duration, _span: &Span| {
130//! tracing::debug!("response generated in {:?}", latency)
131//! })
132//! .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
133//! tracing::debug!("sending {} bytes", chunk.len())
134//! })
135//! .on_eos(|trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
136//! tracing::debug!("stream closed after {:?}", stream_duration)
137//! })
138//! .on_failure(|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
139//! tracing::debug!("something went wrong")
140//! })
141//! )
142//! .service_fn(handle);
143//! # let mut service = service;
144//! # let response = service
145//! # .ready()
146//! # .await?
147//! # .call(Request::new(Full::from("foo")))
148//! # .await?;
149//! # Ok(())
150//! # }
151//! ```
152//!
153//! ## Disabling something
154//!
155//! Setting the behaviour to `()` will be disable that particular step:
156//!
157//! ```rust
158//! use http::StatusCode;
159//! use tower::ServiceBuilder;
160//! use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
161//! use std::time::Duration;
162//! use tracing::Span;
163//! # use tower::{ServiceExt, Service};
164//! # use http_body_util::Full;
165//! # use bytes::Bytes;
166//! # use http::{Response, Request};
167//! # use std::convert::Infallible;
168//!
169//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
170//! # Ok(Response::new(Full::from("foo")))
171//! # }
172//! # #[tokio::main]
173//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
174//! # tracing_subscriber::fmt::init();
175//! #
176//! let service = ServiceBuilder::new()
177//! .layer(
178//! // This configuration will only emit events on failures
179//! TraceLayer::new_for_http()
180//! .on_request(())
181//! .on_response(())
182//! .on_body_chunk(())
183//! .on_eos(())
184//! .on_failure(|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
185//! tracing::debug!("something went wrong")
186//! })
187//! )
188//! .service_fn(handle);
189//! # let mut service = service;
190//! # let response = service
191//! # .ready()
192//! # .await?
193//! # .call(Request::new(Full::from("foo")))
194//! # .await?;
195//! # Ok(())
196//! # }
197//! ```
198//!
199//! # When the callbacks are called
200//!
201//! ### `on_request`
202//!
203//! The `on_request` callback is called when the request arrives at the
204//! middleware in [`Service::call`] just prior to passing the request to the
205//! inner service.
206//!
207//! ### `on_response`
208//!
209//! The `on_response` callback is called when the inner service's response
210//! future completes with `Ok(response)` regardless if the response is
211//! classified as a success or a failure.
212//!
213//! For example if you're using [`ServerErrorsAsFailures`] as your classifier
214//! and the inner service responds with `500 Internal Server Error` then the
215//! `on_response` callback is still called. `on_failure` would _also_ be called
216//! in this case since the response was classified as a failure.
217//!
218//! ### `on_body_chunk`
219//!
220//! The `on_body_chunk` callback is called when the response body produces a new
221//! chunk, that is when [`Body::poll_frame`] returns a data frame.
222//!
223//! `on_body_chunk` is called even if the chunk is empty.
224//!
225//! ### `on_eos`
226//!
227//! The `on_eos` callback is called when a streaming response body ends, that is
228//! when [`Body::poll_frame`] returns a trailers frame.
229//!
230//! `on_eos` is called even if the trailers produced are `None`.
231//!
232//! ### `on_failure`
233//!
234//! The `on_failure` callback is called when:
235//!
236//! - The inner [`Service`]'s response future resolves to an error.
237//! - A response is classified as a failure.
238//! - [`Body::poll_frame`] returns an error.
239//! - An end-of-stream is classified as a failure.
240//!
241//! # Recording fields on the span
242//!
243//! All callbacks receive a reference to the [tracing] [`Span`], corresponding to this request,
244//! produced by the closure passed to [`TraceLayer::make_span_with`]. It can be used to [record
245//! field values][record] that weren't known when the span was created.
246//!
247//! ```rust
248//! use http::{Request, Response, HeaderMap, StatusCode};
249//! use http_body_util::Full;
250//! use bytes::Bytes;
251//! use tower::ServiceBuilder;
252//! use tower_http::trace::TraceLayer;
253//! use tracing::Span;
254//! use std::time::Duration;
255//! # use std::convert::Infallible;
256//!
257//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
258//! # Ok(Response::new(Full::from("foo")))
259//! # }
260//! # #[tokio::main]
261//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
262//! # tracing_subscriber::fmt::init();
263//! #
264//! let service = ServiceBuilder::new()
265//! .layer(
266//! TraceLayer::new_for_http()
267//! .make_span_with(|request: &Request<Full<Bytes>>| {
268//! tracing::debug_span!(
269//! "http-request",
270//! status_code = tracing::field::Empty,
271//! )
272//! })
273//! .on_response(|response: &Response<Full<Bytes>>, _latency: Duration, span: &Span| {
274//! span.record("status_code", &tracing::field::display(response.status()));
275//!
276//! tracing::debug!("response generated")
277//! })
278//! )
279//! .service_fn(handle);
280//! # Ok(())
281//! # }
282//! ```
283//!
284//! # Providing classifiers
285//!
286//! Tracing requires determining if a response is a success or failure. [`MakeClassifier`] is used
287//! to create a classifier for the incoming request. See the docs for [`MakeClassifier`] and
288//! [`ClassifyResponse`] for more details on classification.
289//!
290//! A [`MakeClassifier`] can be provided when creating a [`TraceLayer`]:
291//!
292//! ```rust
293//! use http::{Request, Response};
294//! use http_body_util::Full;
295//! use bytes::Bytes;
296//! use tower::ServiceBuilder;
297//! use tower_http::{
298//! trace::TraceLayer,
299//! classify::{
300//! MakeClassifier, ClassifyResponse, ClassifiedResponse, NeverClassifyEos,
301//! SharedClassifier,
302//! },
303//! };
304//! use std::convert::Infallible;
305//!
306//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
307//! # Ok(Response::new(Full::from("foo")))
308//! # }
309//! # #[tokio::main]
310//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
311//! # tracing_subscriber::fmt::init();
312//! #
313//! // Our `MakeClassifier` that always crates `MyClassifier` classifiers.
314//! #[derive(Copy, Clone)]
315//! struct MyMakeClassify;
316//!
317//! impl MakeClassifier for MyMakeClassify {
318//! type Classifier = MyClassifier;
319//! type FailureClass = &'static str;
320//! type ClassifyEos = NeverClassifyEos<&'static str>;
321//!
322//! fn make_classifier<B>(&self, req: &Request<B>) -> Self::Classifier {
323//! MyClassifier
324//! }
325//! }
326//!
327//! // A classifier that classifies failures as `"something went wrong..."`.
328//! #[derive(Copy, Clone)]
329//! struct MyClassifier;
330//!
331//! impl ClassifyResponse for MyClassifier {
332//! type FailureClass = &'static str;
333//! type ClassifyEos = NeverClassifyEos<&'static str>;
334//!
335//! fn classify_response<B>(
336//! self,
337//! res: &Response<B>
338//! ) -> ClassifiedResponse<Self::FailureClass, Self::ClassifyEos> {
339//! // Classify based on the status code.
340//! if res.status().is_server_error() {
341//! ClassifiedResponse::Ready(Err("something went wrong..."))
342//! } else {
343//! ClassifiedResponse::Ready(Ok(()))
344//! }
345//! }
346//!
347//! fn classify_error<E>(self, error: &E) -> Self::FailureClass
348//! where
349//! E: std::fmt::Display + 'static,
350//! {
351//! "something went wrong..."
352//! }
353//! }
354//!
355//! let service = ServiceBuilder::new()
356//! // Create a trace layer that uses our classifier.
357//! .layer(TraceLayer::new(MyMakeClassify))
358//! .service_fn(handle);
359//!
360//! // Since `MyClassifier` is `Clone` we can also use `SharedClassifier`
361//! // to avoid having to define a separate `MakeClassifier`.
362//! let service = ServiceBuilder::new()
363//! .layer(TraceLayer::new(SharedClassifier::new(MyClassifier)))
364//! .service_fn(handle);
365//! # Ok(())
366//! # }
367//! ```
368//!
369//! [`TraceLayer`] comes with convenience methods for using common classifiers:
370//!
371//! - [`TraceLayer::new_for_http`] classifies based on the status code. It doesn't consider
372//! streaming responses.
373//! - [`TraceLayer::new_for_grpc`] classifies based on the gRPC protocol and supports streaming
374//! responses.
375//!
376//! [tracing]: https://crates.io/crates/tracing
377//! [`Service`]: tower_service::Service
378//! [`Service::call`]: tower_service::Service::call
379//! [`MakeClassifier`]: crate::classify::MakeClassifier
380//! [`ClassifyResponse`]: crate::classify::ClassifyResponse
381//! [record]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.record
382//! [`TraceLayer::make_span_with`]: crate::trace::TraceLayer::make_span_with
383//! [`Span`]: tracing::Span
384//! [`ServerErrorsAsFailures`]: crate::classify::ServerErrorsAsFailures
385//! [`Body::poll_frame`]: http_body::Body::poll_frame
386
387use std::{fmt, time::Duration};
388
389use tracing::Level;
390
391pub use self::{
392 body::ResponseBody,
393 future::ResponseFuture,
394 layer::TraceLayer,
395 make_span::{DefaultMakeSpan, MakeSpan},
396 on_body_chunk::{DefaultOnBodyChunk, OnBodyChunk},
397 on_eos::{DefaultOnEos, OnEos},
398 on_failure::{DefaultOnFailure, OnFailure},
399 on_request::{DefaultOnRequest, OnRequest},
400 on_response::{DefaultOnResponse, OnResponse},
401 service::Trace,
402};
403use crate::{
404 classify::{GrpcErrorsAsFailures, ServerErrorsAsFailures, SharedClassifier},
405 LatencyUnit,
406};
407
408/// MakeClassifier for HTTP requests.
409pub type HttpMakeClassifier = SharedClassifier<ServerErrorsAsFailures>;
410
411/// MakeClassifier for gRPC requests.
412pub type GrpcMakeClassifier = SharedClassifier<GrpcErrorsAsFailures>;
413
414macro_rules! event_dynamic_lvl {
415 ( $(target: $target:expr,)? $(parent: $parent:expr,)? $lvl:expr, $($tt:tt)* ) => {
416 match $lvl {
417 tracing::Level::ERROR => {
418 tracing::event!(
419 $(target: $target,)?
420 $(parent: $parent,)?
421 tracing::Level::ERROR,
422 $($tt)*
423 );
424 }
425 tracing::Level::WARN => {
426 tracing::event!(
427 $(target: $target,)?
428 $(parent: $parent,)?
429 tracing::Level::WARN,
430 $($tt)*
431 );
432 }
433 tracing::Level::INFO => {
434 tracing::event!(
435 $(target: $target,)?
436 $(parent: $parent,)?
437 tracing::Level::INFO,
438 $($tt)*
439 );
440 }
441 tracing::Level::DEBUG => {
442 tracing::event!(
443 $(target: $target,)?
444 $(parent: $parent,)?
445 tracing::Level::DEBUG,
446 $($tt)*
447 );
448 }
449 tracing::Level::TRACE => {
450 tracing::event!(
451 $(target: $target,)?
452 $(parent: $parent,)?
453 tracing::Level::TRACE,
454 $($tt)*
455 );
456 }
457 }
458 };
459}
460
461mod body;
462mod future;
463mod layer;
464mod make_span;
465mod on_body_chunk;
466mod on_eos;
467mod on_failure;
468mod on_request;
469mod on_response;
470mod service;
471
472const DEFAULT_MESSAGE_LEVEL: Level = Level::DEBUG;
473const DEFAULT_ERROR_LEVEL: Level = Level::ERROR;
474
475struct Latency {
476 unit: LatencyUnit,
477 duration: Duration,
478}
479
480impl fmt::Display for Latency {
481 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482 match self.unit {
483 LatencyUnit::Seconds => write!(f, "{} s", self.duration.as_secs_f64()),
484 LatencyUnit::Millis => write!(f, "{} ms", self.duration.as_millis()),
485 LatencyUnit::Micros => write!(f, "{} μs", self.duration.as_micros()),
486 LatencyUnit::Nanos => write!(f, "{} ns", self.duration.as_nanos()),
487 }
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494 use crate::classify::ServerErrorsFailureClass;
495 use crate::test_helpers::Body;
496 use bytes::Bytes;
497 use http::{HeaderMap, Request, Response};
498 use once_cell::sync::Lazy;
499 use std::{
500 sync::atomic::{AtomicU32, Ordering},
501 time::Duration,
502 };
503 use tower::{BoxError, Service, ServiceBuilder, ServiceExt};
504 use tracing::Span;
505
506 #[tokio::test]
507 async fn unary_request() {
508 static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
509 static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
510 static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
511 static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
512 static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
513
514 let trace_layer = TraceLayer::new_for_http()
515 .make_span_with(|_req: &Request<Body>| {
516 tracing::info_span!("test-span", foo = tracing::field::Empty)
517 })
518 .on_request(|_req: &Request<Body>, span: &Span| {
519 span.record("foo", &42);
520 ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
521 })
522 .on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
523 ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
524 })
525 .on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
526 ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
527 })
528 .on_eos(
529 |_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
530 ON_EOS.fetch_add(1, Ordering::SeqCst);
531 },
532 )
533 .on_failure(
534 |_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
535 ON_FAILURE.fetch_add(1, Ordering::SeqCst);
536 },
537 );
538
539 let mut svc = ServiceBuilder::new().layer(trace_layer).service_fn(echo);
540
541 let res = svc
542 .ready()
543 .await
544 .unwrap()
545 .call(Request::new(Body::from("foobar")))
546 .await
547 .unwrap();
548
549 assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
550 assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
551 assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
552 assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
553 assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
554
555 crate::test_helpers::to_bytes(res.into_body())
556 .await
557 .unwrap();
558 assert_eq!(1, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
559 assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
560 assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
561 }
562
563 #[tokio::test]
564 async fn streaming_response() {
565 static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
566 static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
567 static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
568 static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
569 static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
570
571 let trace_layer = TraceLayer::new_for_http()
572 .on_request(|_req: &Request<Body>, _span: &Span| {
573 ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
574 })
575 .on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
576 ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
577 })
578 .on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
579 ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
580 })
581 .on_eos(
582 |_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
583 ON_EOS.fetch_add(1, Ordering::SeqCst);
584 },
585 )
586 .on_failure(
587 |_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
588 ON_FAILURE.fetch_add(1, Ordering::SeqCst);
589 },
590 );
591
592 let mut svc = ServiceBuilder::new()
593 .layer(trace_layer)
594 .service_fn(streaming_body);
595
596 let res = svc
597 .ready()
598 .await
599 .unwrap()
600 .call(Request::new(Body::empty()))
601 .await
602 .unwrap();
603
604 assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
605 assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
606 assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
607 assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
608 assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
609
610 crate::test_helpers::to_bytes(res.into_body())
611 .await
612 .unwrap();
613 assert_eq!(3, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
614 assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
615 assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
616 }
617
618 async fn echo(req: Request<Body>) -> Result<Response<Body>, BoxError> {
619 Ok(Response::new(req.into_body()))
620 }
621
622 async fn streaming_body(_req: Request<Body>) -> Result<Response<Body>, BoxError> {
623 use futures_util::stream::iter;
624
625 let stream = iter(vec![
626 Ok::<_, BoxError>(Bytes::from("one")),
627 Ok::<_, BoxError>(Bytes::from("two")),
628 Ok::<_, BoxError>(Bytes::from("three")),
629 ]);
630
631 let body = Body::from_stream(stream);
632
633 Ok(Response::new(body))
634 }
635}