tower_http/timeout/
service.rs1use crate::timeout::body::TimeoutBody;
2use http::{Request, Response, StatusCode};
3use pin_project_lite::pin_project;
4use std::{
5 future::Future,
6 pin::Pin,
7 task::{ready, Context, Poll},
8 time::Duration,
9};
10use tokio::time::Sleep;
11use tower_layer::Layer;
12use tower_service::Service;
13
14#[derive(Debug, Clone, Copy)]
18pub struct TimeoutLayer {
19 timeout: Duration,
20}
21
22impl TimeoutLayer {
23 pub fn new(timeout: Duration) -> Self {
25 TimeoutLayer { timeout }
26 }
27}
28
29impl<S> Layer<S> for TimeoutLayer {
30 type Service = Timeout<S>;
31
32 fn layer(&self, inner: S) -> Self::Service {
33 Timeout::new(inner, self.timeout)
34 }
35}
36
37#[derive(Debug, Clone, Copy)]
44pub struct Timeout<S> {
45 inner: S,
46 timeout: Duration,
47}
48
49impl<S> Timeout<S> {
50 pub fn new(inner: S, timeout: Duration) -> Self {
52 Self { inner, timeout }
53 }
54
55 define_inner_service_accessors!();
56
57 pub fn layer(timeout: Duration) -> TimeoutLayer {
61 TimeoutLayer::new(timeout)
62 }
63}
64
65impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Timeout<S>
66where
67 S: Service<Request<ReqBody>, Response = Response<ResBody>>,
68 ResBody: Default,
69{
70 type Response = S::Response;
71 type Error = S::Error;
72 type Future = ResponseFuture<S::Future>;
73
74 #[inline]
75 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76 self.inner.poll_ready(cx)
77 }
78
79 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
80 let sleep = tokio::time::sleep(self.timeout);
81 ResponseFuture {
82 inner: self.inner.call(req),
83 sleep,
84 }
85 }
86}
87
88pin_project! {
89 pub struct ResponseFuture<F> {
91 #[pin]
92 inner: F,
93 #[pin]
94 sleep: Sleep,
95 }
96}
97
98impl<F, B, E> Future for ResponseFuture<F>
99where
100 F: Future<Output = Result<Response<B>, E>>,
101 B: Default,
102{
103 type Output = Result<Response<B>, E>;
104
105 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106 let this = self.project();
107
108 if this.sleep.poll(cx).is_ready() {
109 let mut res = Response::new(B::default());
110 *res.status_mut() = StatusCode::REQUEST_TIMEOUT;
111 return Poll::Ready(Ok(res));
112 }
113
114 this.inner.poll(cx)
115 }
116}
117
118#[derive(Clone, Debug)]
120pub struct RequestBodyTimeoutLayer {
121 timeout: Duration,
122}
123
124impl RequestBodyTimeoutLayer {
125 pub fn new(timeout: Duration) -> Self {
127 Self { timeout }
128 }
129}
130
131impl<S> Layer<S> for RequestBodyTimeoutLayer {
132 type Service = RequestBodyTimeout<S>;
133
134 fn layer(&self, inner: S) -> Self::Service {
135 RequestBodyTimeout::new(inner, self.timeout)
136 }
137}
138
139#[derive(Clone, Debug)]
141pub struct RequestBodyTimeout<S> {
142 inner: S,
143 timeout: Duration,
144}
145
146impl<S> RequestBodyTimeout<S> {
147 pub fn new(service: S, timeout: Duration) -> Self {
149 Self {
150 inner: service,
151 timeout,
152 }
153 }
154
155 pub fn layer(timeout: Duration) -> RequestBodyTimeoutLayer {
159 RequestBodyTimeoutLayer::new(timeout)
160 }
161
162 define_inner_service_accessors!();
163}
164
165impl<S, ReqBody> Service<Request<ReqBody>> for RequestBodyTimeout<S>
166where
167 S: Service<Request<TimeoutBody<ReqBody>>>,
168 S::Error: Into<Box<dyn std::error::Error>>,
169{
170 type Response = S::Response;
171 type Error = S::Error;
172 type Future = S::Future;
173
174 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
175 self.inner.poll_ready(cx)
176 }
177
178 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
179 let req = req.map(|body| TimeoutBody::new(self.timeout, body));
180 self.inner.call(req)
181 }
182}
183
184#[derive(Clone)]
186pub struct ResponseBodyTimeoutLayer {
187 timeout: Duration,
188}
189
190impl ResponseBodyTimeoutLayer {
191 pub fn new(timeout: Duration) -> Self {
193 Self { timeout }
194 }
195}
196
197impl<S> Layer<S> for ResponseBodyTimeoutLayer {
198 type Service = ResponseBodyTimeout<S>;
199
200 fn layer(&self, inner: S) -> Self::Service {
201 ResponseBodyTimeout::new(inner, self.timeout)
202 }
203}
204
205#[derive(Clone)]
207pub struct ResponseBodyTimeout<S> {
208 inner: S,
209 timeout: Duration,
210}
211
212impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for ResponseBodyTimeout<S>
213where
214 S: Service<Request<ReqBody>, Response = Response<ResBody>>,
215 S::Error: Into<Box<dyn std::error::Error>>,
216{
217 type Response = Response<TimeoutBody<ResBody>>;
218 type Error = S::Error;
219 type Future = ResponseBodyTimeoutFuture<S::Future>;
220
221 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222 self.inner.poll_ready(cx)
223 }
224
225 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
226 ResponseBodyTimeoutFuture {
227 inner: self.inner.call(req),
228 timeout: self.timeout,
229 }
230 }
231}
232
233impl<S> ResponseBodyTimeout<S> {
234 pub fn new(service: S, timeout: Duration) -> Self {
236 Self {
237 inner: service,
238 timeout,
239 }
240 }
241
242 pub fn layer(timeout: Duration) -> ResponseBodyTimeoutLayer {
246 ResponseBodyTimeoutLayer::new(timeout)
247 }
248
249 define_inner_service_accessors!();
250}
251
252pin_project! {
253 pub struct ResponseBodyTimeoutFuture<Fut> {
255 #[pin]
256 inner: Fut,
257 timeout: Duration,
258 }
259}
260
261impl<Fut, ResBody, E> Future for ResponseBodyTimeoutFuture<Fut>
262where
263 Fut: Future<Output = Result<Response<ResBody>, E>>,
264{
265 type Output = Result<Response<TimeoutBody<ResBody>>, E>;
266
267 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
268 let timeout = self.timeout;
269 let this = self.project();
270 let res = ready!(this.inner.poll(cx))?;
271 Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body))))
272 }
273}