tower_http/timeout/
service.rs

1use crate::timeout::body::TimeoutBody;
2use http::{Request, Response, StatusCode};
3use pin_project_lite::pin_project;
4use std::{
5    future::Future,
6    pin::Pin,
7    task::{ready, Context, Poll},
8    time::Duration,
9};
10use tokio::time::Sleep;
11use tower_layer::Layer;
12use tower_service::Service;
13
14/// Layer that applies the [`Timeout`] middleware which apply a timeout to requests.
15///
16/// See the [module docs](super) for an example.
17#[derive(Debug, Clone, Copy)]
18pub struct TimeoutLayer {
19    timeout: Duration,
20}
21
22impl TimeoutLayer {
23    /// Creates a new [`TimeoutLayer`].
24    pub fn new(timeout: Duration) -> Self {
25        TimeoutLayer { timeout }
26    }
27}
28
29impl<S> Layer<S> for TimeoutLayer {
30    type Service = Timeout<S>;
31
32    fn layer(&self, inner: S) -> Self::Service {
33        Timeout::new(inner, self.timeout)
34    }
35}
36
37/// Middleware which apply a timeout to requests.
38///
39/// If the request does not complete within the specified timeout it will be aborted and a `408
40/// Request Timeout` response will be sent.
41///
42/// See the [module docs](super) for an example.
43#[derive(Debug, Clone, Copy)]
44pub struct Timeout<S> {
45    inner: S,
46    timeout: Duration,
47}
48
49impl<S> Timeout<S> {
50    /// Creates a new [`Timeout`].
51    pub fn new(inner: S, timeout: Duration) -> Self {
52        Self { inner, timeout }
53    }
54
55    define_inner_service_accessors!();
56
57    /// Returns a new [`Layer`] that wraps services with a `Timeout` middleware.
58    ///
59    /// [`Layer`]: tower_layer::Layer
60    pub fn layer(timeout: Duration) -> TimeoutLayer {
61        TimeoutLayer::new(timeout)
62    }
63}
64
65impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Timeout<S>
66where
67    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
68    ResBody: Default,
69{
70    type Response = S::Response;
71    type Error = S::Error;
72    type Future = ResponseFuture<S::Future>;
73
74    #[inline]
75    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76        self.inner.poll_ready(cx)
77    }
78
79    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
80        let sleep = tokio::time::sleep(self.timeout);
81        ResponseFuture {
82            inner: self.inner.call(req),
83            sleep,
84        }
85    }
86}
87
88pin_project! {
89    /// Response future for [`Timeout`].
90    pub struct ResponseFuture<F> {
91        #[pin]
92        inner: F,
93        #[pin]
94        sleep: Sleep,
95    }
96}
97
98impl<F, B, E> Future for ResponseFuture<F>
99where
100    F: Future<Output = Result<Response<B>, E>>,
101    B: Default,
102{
103    type Output = Result<Response<B>, E>;
104
105    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106        let this = self.project();
107
108        if this.sleep.poll(cx).is_ready() {
109            let mut res = Response::new(B::default());
110            *res.status_mut() = StatusCode::REQUEST_TIMEOUT;
111            return Poll::Ready(Ok(res));
112        }
113
114        this.inner.poll(cx)
115    }
116}
117
118/// Applies a [`TimeoutBody`] to the request body.
119#[derive(Clone, Debug)]
120pub struct RequestBodyTimeoutLayer {
121    timeout: Duration,
122}
123
124impl RequestBodyTimeoutLayer {
125    /// Creates a new [`RequestBodyTimeoutLayer`].
126    pub fn new(timeout: Duration) -> Self {
127        Self { timeout }
128    }
129}
130
131impl<S> Layer<S> for RequestBodyTimeoutLayer {
132    type Service = RequestBodyTimeout<S>;
133
134    fn layer(&self, inner: S) -> Self::Service {
135        RequestBodyTimeout::new(inner, self.timeout)
136    }
137}
138
139/// Applies a [`TimeoutBody`] to the request body.
140#[derive(Clone, Debug)]
141pub struct RequestBodyTimeout<S> {
142    inner: S,
143    timeout: Duration,
144}
145
146impl<S> RequestBodyTimeout<S> {
147    /// Creates a new [`RequestBodyTimeout`].
148    pub fn new(service: S, timeout: Duration) -> Self {
149        Self {
150            inner: service,
151            timeout,
152        }
153    }
154
155    /// Returns a new [`Layer`] that wraps services with a [`RequestBodyTimeoutLayer`] middleware.
156    ///
157    /// [`Layer`]: tower_layer::Layer
158    pub fn layer(timeout: Duration) -> RequestBodyTimeoutLayer {
159        RequestBodyTimeoutLayer::new(timeout)
160    }
161
162    define_inner_service_accessors!();
163}
164
165impl<S, ReqBody> Service<Request<ReqBody>> for RequestBodyTimeout<S>
166where
167    S: Service<Request<TimeoutBody<ReqBody>>>,
168    S::Error: Into<Box<dyn std::error::Error>>,
169{
170    type Response = S::Response;
171    type Error = S::Error;
172    type Future = S::Future;
173
174    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
175        self.inner.poll_ready(cx)
176    }
177
178    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
179        let req = req.map(|body| TimeoutBody::new(self.timeout, body));
180        self.inner.call(req)
181    }
182}
183
184/// Applies a [`TimeoutBody`] to the response body.
185#[derive(Clone)]
186pub struct ResponseBodyTimeoutLayer {
187    timeout: Duration,
188}
189
190impl ResponseBodyTimeoutLayer {
191    /// Creates a new [`ResponseBodyTimeoutLayer`].
192    pub fn new(timeout: Duration) -> Self {
193        Self { timeout }
194    }
195}
196
197impl<S> Layer<S> for ResponseBodyTimeoutLayer {
198    type Service = ResponseBodyTimeout<S>;
199
200    fn layer(&self, inner: S) -> Self::Service {
201        ResponseBodyTimeout::new(inner, self.timeout)
202    }
203}
204
205/// Applies a [`TimeoutBody`] to the response body.
206#[derive(Clone)]
207pub struct ResponseBodyTimeout<S> {
208    inner: S,
209    timeout: Duration,
210}
211
212impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for ResponseBodyTimeout<S>
213where
214    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
215    S::Error: Into<Box<dyn std::error::Error>>,
216{
217    type Response = Response<TimeoutBody<ResBody>>;
218    type Error = S::Error;
219    type Future = ResponseBodyTimeoutFuture<S::Future>;
220
221    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222        self.inner.poll_ready(cx)
223    }
224
225    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
226        ResponseBodyTimeoutFuture {
227            inner: self.inner.call(req),
228            timeout: self.timeout,
229        }
230    }
231}
232
233impl<S> ResponseBodyTimeout<S> {
234    /// Creates a new [`ResponseBodyTimeout`].
235    pub fn new(service: S, timeout: Duration) -> Self {
236        Self {
237            inner: service,
238            timeout,
239        }
240    }
241
242    /// Returns a new [`Layer`] that wraps services with a [`ResponseBodyTimeoutLayer`] middleware.
243    ///
244    /// [`Layer`]: tower_layer::Layer
245    pub fn layer(timeout: Duration) -> ResponseBodyTimeoutLayer {
246        ResponseBodyTimeoutLayer::new(timeout)
247    }
248
249    define_inner_service_accessors!();
250}
251
252pin_project! {
253    /// Response future for [`ResponseBodyTimeout`].
254    pub struct ResponseBodyTimeoutFuture<Fut> {
255        #[pin]
256        inner: Fut,
257        timeout: Duration,
258    }
259}
260
261impl<Fut, ResBody, E> Future for ResponseBodyTimeoutFuture<Fut>
262where
263    Fut: Future<Output = Result<Response<ResBody>, E>>,
264{
265    type Output = Result<Response<TimeoutBody<ResBody>>, E>;
266
267    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
268        let timeout = self.timeout;
269        let this = self.project();
270        let res = ready!(this.inner.poll(cx))?;
271        Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body))))
272    }
273}