tokio_tower/pipeline/
server.rs1use futures_core::{ready, stream::TryStream};
2use futures_sink::Sink;
3use futures_util::stream::FuturesOrdered;
4use pin_project::pin_project;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::{error, fmt};
9use tower_service::Service;
10
11#[pin_project]
17#[derive(Debug)]
18pub struct Server<T, S>
19where
20 T: Sink<S::Response> + TryStream,
21 S: Service<<T as TryStream>::Ok>,
22{
23 #[pin]
24 pending: FuturesOrdered<S::Future>,
25 #[pin]
26 transport: T,
27 service: S,
28
29 in_flight: usize,
30 finish: bool,
31}
32
33pub enum Error<T, S>
35where
36 T: Sink<S::Response> + TryStream,
37 S: Service<<T as TryStream>::Ok>,
38{
39 BrokenTransportRecv(<T as TryStream>::Error),
41
42 BrokenTransportSend(<T as Sink<S::Response>>::Error),
44
45 Service(S::Error),
47}
48
49impl<T, S> fmt::Display for Error<T, S>
50where
51 T: Sink<S::Response> + TryStream,
52 S: Service<<T as TryStream>::Ok>,
53 <T as Sink<S::Response>>::Error: fmt::Display,
54 <T as TryStream>::Error: fmt::Display,
55 S::Error: fmt::Display,
56{
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 match *self {
59 Error::BrokenTransportRecv(_) => {
60 f.pad("underlying transport failed to produce a request")
61 }
62 Error::BrokenTransportSend(_) => {
63 f.pad("underlying transport failed while attempting to send a response")
64 }
65 Error::Service(_) => f.pad("underlying service failed to process a request"),
66 }
67 }
68}
69
70impl<T, S> fmt::Debug for Error<T, S>
71where
72 T: Sink<S::Response> + TryStream,
73 S: Service<<T as TryStream>::Ok>,
74 <T as Sink<S::Response>>::Error: fmt::Debug,
75 <T as TryStream>::Error: fmt::Debug,
76 S::Error: fmt::Debug,
77{
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 match *self {
80 Error::BrokenTransportRecv(ref se) => write!(f, "BrokenTransportRecv({:?})", se),
81 Error::BrokenTransportSend(ref se) => write!(f, "BrokenTransportSend({:?})", se),
82 Error::Service(ref se) => write!(f, "Service({:?})", se),
83 }
84 }
85}
86
87impl<T, S> error::Error for Error<T, S>
88where
89 T: Sink<S::Response> + TryStream,
90 S: Service<<T as TryStream>::Ok>,
91 <T as Sink<S::Response>>::Error: error::Error + 'static,
92 <T as TryStream>::Error: error::Error + 'static,
93 S::Error: error::Error + 'static,
94{
95 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
96 match *self {
97 Error::BrokenTransportSend(ref se) => Some(se),
98 Error::BrokenTransportRecv(ref se) => Some(se),
99 Error::Service(ref se) => Some(se),
100 }
101 }
102}
103
104impl<T, S> Error<T, S>
105where
106 T: Sink<S::Response> + TryStream,
107 S: Service<<T as TryStream>::Ok>,
108{
109 fn from_sink_error(e: <T as Sink<S::Response>>::Error) -> Self {
110 Error::BrokenTransportSend(e)
111 }
112
113 fn from_stream_error(e: <T as TryStream>::Error) -> Self {
114 Error::BrokenTransportRecv(e)
115 }
116
117 fn from_service_error(e: S::Error) -> Self {
118 Error::Service(e)
119 }
120}
121
122impl<T, S> Server<T, S>
123where
124 T: Sink<S::Response> + TryStream,
125 S: Service<<T as TryStream>::Ok>,
126{
127 pub fn new(transport: T, service: S) -> Self {
135 Server {
136 pending: FuturesOrdered::new(),
137 transport,
138 service,
139 in_flight: 0,
140 finish: false,
141 }
142 }
143
144 }
173
174impl<T, S> Future for Server<T, S>
175where
176 T: Sink<S::Response> + TryStream,
177 S: Service<<T as TryStream>::Ok>,
178{
179 type Output = Result<(), Error<T, S>>;
180
181 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
182 let span = tracing::trace_span!("poll");
183 let _guard = span.enter();
184 tracing::trace!("poll");
185
186 let this = self.project();
188
189 let mut transport: Pin<_> = this.transport;
191 let mut pending: Pin<_> = this.pending;
192
193 let mut i = 0;
195
196 loop {
197 while let Poll::Ready(r) = transport.as_mut().poll_ready(cx) {
200 if let Err(e) = r {
201 return Poll::Ready(Err(Error::from_sink_error(e)));
202 }
203
204 tracing::trace!(
205 in_flight = *this.in_flight,
206 pending = pending.len(),
207 "transport.ready"
208 );
209 match pending.as_mut().try_poll_next(cx) {
210 Poll::Ready(Some(Err(e))) => {
211 return Poll::Ready(Err(Error::from_service_error(e)));
212 }
213 Poll::Ready(Some(Ok(rsp))) => {
214 tracing::trace!("transport.start_send");
215 transport
217 .as_mut()
218 .start_send(rsp)
219 .map_err(Error::from_sink_error)?;
220 *this.in_flight -= 1;
221 }
222 _ => {
223 break;
225 }
226 }
227 }
228
229 tracing::trace!(finish = *this.finish, "transport.poll_flush");
231 if let Poll::Ready(()) = transport
232 .as_mut()
233 .poll_flush(cx)
234 .map_err(Error::from_sink_error)?
235 {
236 if *this.finish && pending.as_mut().is_empty() {
237 return Poll::Ready(Ok(()));
240 }
241 }
242
243 if *this.finish {
244 return Poll::Pending;
247 }
248
249 i += 1;
251 if i == crate::YIELD_EVERY {
252 tracing::trace!("forced yield");
254 cx.waker().wake_by_ref();
255 return Poll::Pending;
256 }
257
258 tracing::trace!("service.poll_ready");
260 ready!(this.service.poll_ready(cx)).map_err(Error::from_service_error)?;
261
262 tracing::trace!("transport.poll_next");
263 let rq = ready!(transport.as_mut().try_poll_next(cx))
264 .transpose()
265 .map_err(Error::from_stream_error)?;
266 if let Some(rq) = rq {
267 pending.push(this.service.call(rq));
270 *this.in_flight += 1;
271 } else {
272 assert!(!*this.finish);
275 *this.finish = true;
276 }
277 }
278 }
279}