tokio_tower/multiplex/
server.rs1use futures_core::{ready, stream::TryStream};
2use futures_sink::Sink;
3use futures_util::stream::FuturesUnordered;
4use pin_project::pin_project;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::{error, fmt};
9use tower_service::Service;
10
11#[pin_project]
17#[derive(Debug)]
18pub struct Server<T, S>
19where
20 T: Sink<S::Response> + TryStream,
21 S: Service<<T as TryStream>::Ok>,
22{
23 #[pin]
24 pending: FuturesUnordered<S::Future>,
25 #[pin]
26 transport: T,
27 service: S,
28
29 in_flight: usize,
30 finish: bool,
31}
32
33pub enum Error<T, S>
35where
36 T: Sink<S::Response> + TryStream,
37 S: Service<<T as TryStream>::Ok>,
38{
39 BrokenTransportRecv(<T as TryStream>::Error),
41
42 BrokenTransportSend(<T as Sink<S::Response>>::Error),
44
45 Service(S::Error),
47}
48
49impl<T, S> fmt::Display for Error<T, S>
50where
51 T: Sink<S::Response> + TryStream,
52 S: Service<<T as TryStream>::Ok>,
53 <T as Sink<S::Response>>::Error: fmt::Display,
54 <T as TryStream>::Error: fmt::Display,
55 S::Error: fmt::Display,
56{
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 match *self {
59 Error::BrokenTransportRecv(_) => {
60 f.pad("underlying transport failed to produce a request")
61 }
62 Error::BrokenTransportSend(_) => {
63 f.pad("underlying transport failed while attempting to send a response")
64 }
65 Error::Service(_) => f.pad("underlying service failed to process a request"),
66 }
67 }
68}
69
70impl<T, S> fmt::Debug for Error<T, S>
71where
72 T: Sink<S::Response> + TryStream,
73 S: Service<<T as TryStream>::Ok>,
74 <T as Sink<S::Response>>::Error: fmt::Debug,
75 <T as TryStream>::Error: fmt::Debug,
76 S::Error: fmt::Debug,
77{
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 match *self {
80 Error::BrokenTransportRecv(ref se) => write!(f, "BrokenTransportRecv({:?})", se),
81 Error::BrokenTransportSend(ref se) => write!(f, "BrokenTransportSend({:?})", se),
82 Error::Service(ref se) => write!(f, "Service({:?})", se),
83 }
84 }
85}
86
87impl<T, S> error::Error for Error<T, S>
88where
89 T: Sink<S::Response> + TryStream,
90 S: Service<<T as TryStream>::Ok>,
91 <T as Sink<S::Response>>::Error: error::Error + 'static,
92 <T as TryStream>::Error: error::Error + 'static,
93 S::Error: error::Error + 'static,
94{
95 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
96 match *self {
97 Error::BrokenTransportSend(ref se) => Some(se),
98 Error::BrokenTransportRecv(ref se) => Some(se),
99 Error::Service(ref se) => Some(se),
100 }
101 }
102}
103
104impl<T, S> Error<T, S>
105where
106 T: Sink<S::Response> + TryStream,
107 S: Service<<T as TryStream>::Ok>,
108{
109 fn from_sink_error(e: <T as Sink<S::Response>>::Error) -> Self {
110 Error::BrokenTransportSend(e)
111 }
112
113 fn from_stream_error(e: <T as TryStream>::Error) -> Self {
114 Error::BrokenTransportRecv(e)
115 }
116
117 fn from_service_error(e: S::Error) -> Self {
118 Error::Service(e)
119 }
120}
121
122impl<T, S> Server<T, S>
123where
124 T: Sink<S::Response> + TryStream,
125 S: Service<<T as TryStream>::Ok>,
126{
127 pub fn new(transport: T, service: S) -> Self {
134 Server {
135 pending: FuturesUnordered::new(),
136 transport,
137 service,
138 in_flight: 0,
139 finish: false,
140 }
141 }
142
143 }
172
173impl<T, S> Future for Server<T, S>
174where
175 T: Sink<S::Response> + TryStream,
176 S: Service<<T as TryStream>::Ok>,
177{
178 type Output = Result<(), Error<T, S>>;
179
180 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181 let span = tracing::trace_span!("poll");
182 let _guard = span.enter();
183 tracing::trace!("poll");
184
185 let this = self.project();
187
188 let mut transport: Pin<_> = this.transport;
190 let mut pending: Pin<_> = this.pending;
191
192 let mut i = 0;
194
195 loop {
196 while let Poll::Ready(r) = transport.as_mut().poll_ready(cx) {
199 if let Err(e) = r {
200 return Poll::Ready(Err(Error::from_sink_error(e)));
201 }
202
203 tracing::trace!(
204 in_flight = *this.in_flight,
205 pending = pending.len(),
206 "transport.ready"
207 );
208 match pending.as_mut().try_poll_next(cx) {
209 Poll::Ready(Some(Err(e))) => {
210 return Poll::Ready(Err(Error::from_service_error(e)));
211 }
212 Poll::Ready(Some(Ok(rsp))) => {
213 tracing::trace!("transport.start_send");
214 transport
216 .as_mut()
217 .start_send(rsp)
218 .map_err(Error::from_sink_error)?;
219 *this.in_flight -= 1;
220 }
221 _ => {
222 break;
224 }
225 }
226 }
227
228 tracing::trace!(finish = *this.finish, "transport.poll_flush");
230 if let Poll::Ready(()) = transport
231 .as_mut()
232 .poll_flush(cx)
233 .map_err(Error::from_sink_error)?
234 {
235 if *this.finish && pending.as_mut().is_empty() {
236 return Poll::Ready(Ok(()));
239 }
240 }
241
242 if *this.finish {
243 return Poll::Pending;
246 }
247
248 i += 1;
250 if i == crate::YIELD_EVERY {
251 tracing::trace!("forced yield");
253 cx.waker().wake_by_ref();
254 return Poll::Pending;
255 }
256
257 tracing::trace!("service.poll_ready");
259 ready!(this.service.poll_ready(cx)).map_err(Error::from_service_error)?;
260
261 tracing::trace!("transport.poll_next");
262 let rq = ready!(transport.as_mut().try_poll_next(cx))
263 .transpose()
264 .map_err(Error::from_stream_error)?;
265 if let Some(rq) = rq {
266 pending.push(this.service.call(rq));
269 *this.in_flight += 1;
270 } else {
271 assert!(!*this.finish);
274 *this.finish = true;
275 }
276 }
277 }
278}