rama_http/layer/
collect_body.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//! Collect the http `Body`

use crate::dep::http_body_util::BodyExt;
use crate::{dep::http_body::Body, Request, Response};
use rama_core::{
    error::{BoxError, ErrorContext, OpaqueError},
    Context, Layer, Service,
};
use rama_utils::macros::define_inner_service_accessors;
use std::fmt;

/// An http layer to collect the http `Body`
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct CollectBodyLayer;

impl CollectBodyLayer {
    /// Create a new [`CollectBodyLayer`].
    pub const fn new() -> Self {
        Self
    }
}

impl<S> Layer<S> for CollectBodyLayer {
    type Service = CollectBody<S>;

    fn layer(&self, inner: S) -> Self::Service {
        CollectBody::new(inner)
    }
}

/// Service to collect the http `Body`
pub struct CollectBody<S> {
    inner: S,
}

impl<S> CollectBody<S> {
    /// Create a new [`CollectBody`].
    pub const fn new(service: S) -> Self {
        Self { inner: service }
    }

    define_inner_service_accessors!();
}

impl<S, State, ReqBody, ResBody> Service<State, Request<ReqBody>> for CollectBody<S>
where
    S: Service<State, Request<ReqBody>, Response = Response<ResBody>, Error: Into<BoxError>>,
    State: Clone + Send + Sync + 'static,
    ReqBody: Send + 'static,
    ResBody:
        Body<Data: Send, Error: std::error::Error + Send + Sync + 'static> + Send + Sync + 'static,
{
    type Response = Response;
    type Error = BoxError;

    async fn serve(
        &self,
        ctx: Context<State>,
        req: Request<ReqBody>,
    ) -> Result<Self::Response, Self::Error> {
        let resp = self
            .inner
            .serve(ctx, req)
            .await
            .map_err(|err| OpaqueError::from_boxed(err.into()))
            .context("CollectBody::inner:serve")?;
        let (parts, body) = resp.into_parts();
        let bytes = body.collect().await.context("collect body")?.to_bytes();
        let body = crate::Body::from(bytes);
        Ok(Response::from_parts(parts, body))
    }
}

impl<S> fmt::Debug for CollectBody<S>
where
    S: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("CollectBody")
            .field("inner", &self.inner)
            .finish()
    }
}

impl<S> Clone for CollectBody<S>
where
    S: Clone,
{
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}