tower_http/decompression/request/
service.rs

1use super::future::RequestDecompressionFuture as ResponseFuture;
2use super::layer::RequestDecompressionLayer;
3use crate::body::UnsyncBoxBody;
4use crate::compression_utils::CompressionLevel;
5use crate::{
6    compression_utils::AcceptEncoding, decompression::body::BodyInner,
7    decompression::DecompressionBody, BoxError,
8};
9use bytes::Buf;
10use http::{header, Request, Response};
11use http_body::Body;
12use std::task::{Context, Poll};
13use tower_service::Service;
14
15#[cfg(any(
16    feature = "decompression-gzip",
17    feature = "decompression-deflate",
18    feature = "decompression-br",
19    feature = "decompression-zstd",
20))]
21use crate::content_encoding::SupportedEncodings;
22
23/// Decompresses request bodies and calls its underlying service.
24///
25/// Transparently decompresses request bodies based on the `Content-Encoding` header.
26/// When the encoding in the `Content-Encoding` header is not accepted an `Unsupported Media Type`
27/// status code will be returned with the accepted encodings in the `Accept-Encoding` header.
28///
29/// Enabling pass-through of unaccepted encodings will not return an `Unsupported Media Type` but
30/// will call the underlying service with the unmodified request if the encoding is not supported.
31/// This is disabled by default.
32///
33/// See the [module docs](crate::decompression) for more details.
34#[derive(Debug, Clone)]
35pub struct RequestDecompression<S> {
36    pub(super) inner: S,
37    pub(super) accept: AcceptEncoding,
38    pub(super) pass_through_unaccepted: bool,
39}
40
41impl<S, ReqBody, ResBody, D> Service<Request<ReqBody>> for RequestDecompression<S>
42where
43    S: Service<Request<DecompressionBody<ReqBody>>, Response = Response<ResBody>>,
44    ReqBody: Body,
45    ResBody: Body<Data = D> + Send + 'static,
46    <ResBody as Body>::Error: Into<BoxError>,
47    D: Buf + 'static,
48{
49    type Response = Response<UnsyncBoxBody<D, BoxError>>;
50    type Error = S::Error;
51    type Future = ResponseFuture<S::Future, ResBody, S::Error>;
52
53    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
54        self.inner.poll_ready(cx)
55    }
56
57    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
58        let (mut parts, body) = req.into_parts();
59
60        let body =
61            if let header::Entry::Occupied(entry) = parts.headers.entry(header::CONTENT_ENCODING) {
62                match entry.get().as_bytes() {
63                    #[cfg(feature = "decompression-gzip")]
64                    b"gzip" if self.accept.gzip() => {
65                        entry.remove();
66                        parts.headers.remove(header::CONTENT_LENGTH);
67                        BodyInner::gzip(crate::compression_utils::WrapBody::new(
68                            body,
69                            CompressionLevel::default(),
70                        ))
71                    }
72                    #[cfg(feature = "decompression-deflate")]
73                    b"deflate" if self.accept.deflate() => {
74                        entry.remove();
75                        parts.headers.remove(header::CONTENT_LENGTH);
76                        BodyInner::deflate(crate::compression_utils::WrapBody::new(
77                            body,
78                            CompressionLevel::default(),
79                        ))
80                    }
81                    #[cfg(feature = "decompression-br")]
82                    b"br" if self.accept.br() => {
83                        entry.remove();
84                        parts.headers.remove(header::CONTENT_LENGTH);
85                        BodyInner::brotli(crate::compression_utils::WrapBody::new(
86                            body,
87                            CompressionLevel::default(),
88                        ))
89                    }
90                    #[cfg(feature = "decompression-zstd")]
91                    b"zstd" if self.accept.zstd() => {
92                        entry.remove();
93                        parts.headers.remove(header::CONTENT_LENGTH);
94                        BodyInner::zstd(crate::compression_utils::WrapBody::new(
95                            body,
96                            CompressionLevel::default(),
97                        ))
98                    }
99                    b"identity" => BodyInner::identity(body),
100                    _ if self.pass_through_unaccepted => BodyInner::identity(body),
101                    _ => return ResponseFuture::unsupported_encoding(self.accept),
102                }
103            } else {
104                BodyInner::identity(body)
105            };
106        let body = DecompressionBody::new(body);
107        let req = Request::from_parts(parts, body);
108        ResponseFuture::inner(self.inner.call(req))
109    }
110}
111
112impl<S> RequestDecompression<S> {
113    /// Creates a new `RequestDecompression` wrapping the `service`.
114    pub fn new(service: S) -> Self {
115        Self {
116            inner: service,
117            accept: AcceptEncoding::default(),
118            pass_through_unaccepted: false,
119        }
120    }
121
122    define_inner_service_accessors!();
123
124    /// Returns a new [`Layer`] that wraps services with a `RequestDecompression` middleware.
125    ///
126    /// [`Layer`]: tower_layer::Layer
127    pub fn layer() -> RequestDecompressionLayer {
128        RequestDecompressionLayer::new()
129    }
130
131    /// Passes through the request even when the encoding is not supported.
132    ///
133    /// By default pass-through is disabled.
134    pub fn pass_through_unaccepted(mut self, enabled: bool) -> Self {
135        self.pass_through_unaccepted = enabled;
136        self
137    }
138
139    /// Sets whether to support gzip encoding.
140    #[cfg(feature = "decompression-gzip")]
141    pub fn gzip(mut self, enable: bool) -> Self {
142        self.accept.set_gzip(enable);
143        self
144    }
145
146    /// Sets whether to support Deflate encoding.
147    #[cfg(feature = "decompression-deflate")]
148    pub fn deflate(mut self, enable: bool) -> Self {
149        self.accept.set_deflate(enable);
150        self
151    }
152
153    /// Sets whether to support Brotli encoding.
154    #[cfg(feature = "decompression-br")]
155    pub fn br(mut self, enable: bool) -> Self {
156        self.accept.set_br(enable);
157        self
158    }
159
160    /// Sets whether to support Zstd encoding.
161    #[cfg(feature = "decompression-zstd")]
162    pub fn zstd(mut self, enable: bool) -> Self {
163        self.accept.set_zstd(enable);
164        self
165    }
166
167    /// Disables support for gzip encoding.
168    ///
169    /// This method is available even if the `gzip` crate feature is disabled.
170    pub fn no_gzip(mut self) -> Self {
171        self.accept.set_gzip(false);
172        self
173    }
174
175    /// Disables support for Deflate encoding.
176    ///
177    /// This method is available even if the `deflate` crate feature is disabled.
178    pub fn no_deflate(mut self) -> Self {
179        self.accept.set_deflate(false);
180        self
181    }
182
183    /// Disables support for Brotli encoding.
184    ///
185    /// This method is available even if the `br` crate feature is disabled.
186    pub fn no_br(mut self) -> Self {
187        self.accept.set_br(false);
188        self
189    }
190
191    /// Disables support for Zstd encoding.
192    ///
193    /// This method is available even if the `zstd` crate feature is disabled.
194    pub fn no_zstd(mut self) -> Self {
195        self.accept.set_zstd(false);
196        self
197    }
198}