tokio_tower/multiplex/
server.rs

1use futures_core::{ready, stream::TryStream};
2use futures_sink::Sink;
3use futures_util::stream::FuturesUnordered;
4use pin_project::pin_project;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::{error, fmt};
9use tower_service::Service;
10
11/// This type provides an implementation of a Tower
12/// [`Service`](https://docs.rs/tokio-service/0.1/tokio_service/trait.Service.html) on top of a
13/// multiplexed protocol transport. In particular, it wraps a transport that implements
14/// `Sink<SinkItem = Response>` and `Stream<Item = Request>` with the necessary bookkeeping to
15/// adhere to Tower's convenient `fn(Request) -> Future<Response>` API.
16#[pin_project]
17#[derive(Debug)]
18pub struct Server<T, S>
19where
20    T: Sink<S::Response> + TryStream,
21    S: Service<<T as TryStream>::Ok>,
22{
23    #[pin]
24    pending: FuturesUnordered<S::Future>,
25    #[pin]
26    transport: T,
27    service: S,
28
29    in_flight: usize,
30    finish: bool,
31}
32
33/// An error that occurred while servicing a request.
34pub enum Error<T, S>
35where
36    T: Sink<S::Response> + TryStream,
37    S: Service<<T as TryStream>::Ok>,
38{
39    /// The underlying transport failed to produce a request.
40    BrokenTransportRecv(<T as TryStream>::Error),
41
42    /// The underlying transport failed while attempting to send a response.
43    BrokenTransportSend(<T as Sink<S::Response>>::Error),
44
45    /// The underlying service failed to process a request.
46    Service(S::Error),
47}
48
49impl<T, S> fmt::Display for Error<T, S>
50where
51    T: Sink<S::Response> + TryStream,
52    S: Service<<T as TryStream>::Ok>,
53    <T as Sink<S::Response>>::Error: fmt::Display,
54    <T as TryStream>::Error: fmt::Display,
55    S::Error: fmt::Display,
56{
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        match *self {
59            Error::BrokenTransportRecv(_) => {
60                f.pad("underlying transport failed to produce a request")
61            }
62            Error::BrokenTransportSend(_) => {
63                f.pad("underlying transport failed while attempting to send a response")
64            }
65            Error::Service(_) => f.pad("underlying service failed to process a request"),
66        }
67    }
68}
69
70impl<T, S> fmt::Debug for Error<T, S>
71where
72    T: Sink<S::Response> + TryStream,
73    S: Service<<T as TryStream>::Ok>,
74    <T as Sink<S::Response>>::Error: fmt::Debug,
75    <T as TryStream>::Error: fmt::Debug,
76    S::Error: fmt::Debug,
77{
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        match *self {
80            Error::BrokenTransportRecv(ref se) => write!(f, "BrokenTransportRecv({:?})", se),
81            Error::BrokenTransportSend(ref se) => write!(f, "BrokenTransportSend({:?})", se),
82            Error::Service(ref se) => write!(f, "Service({:?})", se),
83        }
84    }
85}
86
87impl<T, S> error::Error for Error<T, S>
88where
89    T: Sink<S::Response> + TryStream,
90    S: Service<<T as TryStream>::Ok>,
91    <T as Sink<S::Response>>::Error: error::Error + 'static,
92    <T as TryStream>::Error: error::Error + 'static,
93    S::Error: error::Error + 'static,
94{
95    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
96        match *self {
97            Error::BrokenTransportSend(ref se) => Some(se),
98            Error::BrokenTransportRecv(ref se) => Some(se),
99            Error::Service(ref se) => Some(se),
100        }
101    }
102}
103
104impl<T, S> Error<T, S>
105where
106    T: Sink<S::Response> + TryStream,
107    S: Service<<T as TryStream>::Ok>,
108{
109    fn from_sink_error(e: <T as Sink<S::Response>>::Error) -> Self {
110        Error::BrokenTransportSend(e)
111    }
112
113    fn from_stream_error(e: <T as TryStream>::Error) -> Self {
114        Error::BrokenTransportRecv(e)
115    }
116
117    fn from_service_error(e: S::Error) -> Self {
118        Error::Service(e)
119    }
120}
121
122impl<T, S> Server<T, S>
123where
124    T: Sink<S::Response> + TryStream,
125    S: Service<<T as TryStream>::Ok>,
126{
127    /// Construct a new [`Server`] over the given `transport` that services requests using the
128    /// given `service`.
129    ///
130    /// Requests are passed to `Service::call` as they arrive, and responses are written back to
131    /// the underlying `transport` in the order that they complete. If a later request completes
132    /// before an earlier request, its response is still sent immediately.
133    pub fn new(transport: T, service: S) -> Self {
134        Server {
135            pending: FuturesUnordered::new(),
136            transport,
137            service,
138            in_flight: 0,
139            finish: false,
140        }
141    }
142
143    /*
144    /// Manage incoming new transport instances using the given service constructor.
145    ///
146    /// For each transport that `incoming` yields, a new instance of `service` is created to
147    /// manage requests on that transport. This is roughly equivalent to:
148    ///
149    /// ```rust,ignore
150    /// incoming.map(|t| Server::multiplexed(t, service.new_service(), limit))
151    /// ```
152    pub fn serve_on<TS, SS, E>(
153        incoming: TS,
154        service: SS,
155        limit: Option<usize>,
156    ) -> impl Stream<Item = Self, Error = E>
157    where
158        TS: Stream<Item = T>,
159        SS: NewService<Request = S::Request, Response = S::Response, Error = S::Error, Service = S>,
160        E: From<TS::Error>,
161        E: From<SS::InitError>,
162    {
163        incoming.map_err(E::from).and_then(move |transport| {
164            service
165                .new_service()
166                .map_err(E::from)
167                .map(move |s| Server::multiplexed(transport, s, limit))
168        })
169    }
170    */
171}
172
173impl<T, S> Future for Server<T, S>
174where
175    T: Sink<S::Response> + TryStream,
176    S: Service<<T as TryStream>::Ok>,
177{
178    type Output = Result<(), Error<T, S>>;
179
180    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181        let span = tracing::trace_span!("poll");
182        let _guard = span.enter();
183        tracing::trace!("poll");
184
185        // go through the deref so we can do partial borrows
186        let this = self.project();
187
188        // we never move transport or pending, nor do we ever hand out &mut to it
189        let mut transport: Pin<_> = this.transport;
190        let mut pending: Pin<_> = this.pending;
191
192        // track how many times we have iterated
193        let mut i = 0;
194
195        loop {
196            // first, poll pending futures to see if any have produced responses
197            // note that we only poll for completed service futures if we can send the response
198            while let Poll::Ready(r) = transport.as_mut().poll_ready(cx) {
199                if let Err(e) = r {
200                    return Poll::Ready(Err(Error::from_sink_error(e)));
201                }
202
203                tracing::trace!(
204                    in_flight = *this.in_flight,
205                    pending = pending.len(),
206                    "transport.ready"
207                );
208                match pending.as_mut().try_poll_next(cx) {
209                    Poll::Ready(Some(Err(e))) => {
210                        return Poll::Ready(Err(Error::from_service_error(e)));
211                    }
212                    Poll::Ready(Some(Ok(rsp))) => {
213                        tracing::trace!("transport.start_send");
214                        // try to send the response!
215                        transport
216                            .as_mut()
217                            .start_send(rsp)
218                            .map_err(Error::from_sink_error)?;
219                        *this.in_flight -= 1;
220                    }
221                    _ => {
222                        // XXX: should we "release" the poll_ready we got from the Sink?
223                        break;
224                    }
225                }
226            }
227
228            // also try to make progress on sending
229            tracing::trace!(finish = *this.finish, "transport.poll_flush");
230            if let Poll::Ready(()) = transport
231                .as_mut()
232                .poll_flush(cx)
233                .map_err(Error::from_sink_error)?
234            {
235                if *this.finish && pending.as_mut().is_empty() {
236                    // there are no more requests
237                    // and we've finished all the work!
238                    return Poll::Ready(Ok(()));
239                }
240            }
241
242            if *this.finish {
243                // there's still work to be done, but there are no more requests
244                // so no need to check the incoming transport
245                return Poll::Pending;
246            }
247
248            // if we have run for a while without yielding, yield back so other tasks can run
249            i += 1;
250            if i == crate::YIELD_EVERY {
251                // we're forcing a yield, so need to ensure we get woken up again
252                tracing::trace!("forced yield");
253                cx.waker().wake_by_ref();
254                return Poll::Pending;
255            }
256
257            // is the service ready?
258            tracing::trace!("service.poll_ready");
259            ready!(this.service.poll_ready(cx)).map_err(Error::from_service_error)?;
260
261            tracing::trace!("transport.poll_next");
262            let rq = ready!(transport.as_mut().try_poll_next(cx))
263                .transpose()
264                .map_err(Error::from_stream_error)?;
265            if let Some(rq) = rq {
266                // the service is ready, and we have another request!
267                // you know what that means:
268                pending.push(this.service.call(rq));
269                *this.in_flight += 1;
270            } else {
271                // there are no more requests coming
272                // check one more time for responses, and then yield
273                assert!(!*this.finish);
274                *this.finish = true;
275            }
276        }
277    }
278}