tokio_tower/pipeline/
server.rs

1use futures_core::{ready, stream::TryStream};
2use futures_sink::Sink;
3use futures_util::stream::FuturesOrdered;
4use pin_project::pin_project;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::{error, fmt};
9use tower_service::Service;
10
11/// This type provides an implementation of a Tower
12/// [`Service`](https://docs.rs/tokio-service/0.1/tokio_service/trait.Service.html) on top of a
13/// request-at-a-time protocol transport. In particular, it wraps a transport that implements
14/// `Sink<SinkItem = Response>` and `Stream<Item = Request>` with the necessary bookkeeping to
15/// adhere to Tower's convenient `fn(Request) -> Future<Response>` API.
16#[pin_project]
17#[derive(Debug)]
18pub struct Server<T, S>
19where
20    T: Sink<S::Response> + TryStream,
21    S: Service<<T as TryStream>::Ok>,
22{
23    #[pin]
24    pending: FuturesOrdered<S::Future>,
25    #[pin]
26    transport: T,
27    service: S,
28
29    in_flight: usize,
30    finish: bool,
31}
32
33/// An error that occurred while servicing a request.
34pub enum Error<T, S>
35where
36    T: Sink<S::Response> + TryStream,
37    S: Service<<T as TryStream>::Ok>,
38{
39    /// The underlying transport failed to produce a request.
40    BrokenTransportRecv(<T as TryStream>::Error),
41
42    /// The underlying transport failed while attempting to send a response.
43    BrokenTransportSend(<T as Sink<S::Response>>::Error),
44
45    /// The underlying service failed to process a request.
46    Service(S::Error),
47}
48
49impl<T, S> fmt::Display for Error<T, S>
50where
51    T: Sink<S::Response> + TryStream,
52    S: Service<<T as TryStream>::Ok>,
53    <T as Sink<S::Response>>::Error: fmt::Display,
54    <T as TryStream>::Error: fmt::Display,
55    S::Error: fmt::Display,
56{
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        match *self {
59            Error::BrokenTransportRecv(_) => {
60                f.pad("underlying transport failed to produce a request")
61            }
62            Error::BrokenTransportSend(_) => {
63                f.pad("underlying transport failed while attempting to send a response")
64            }
65            Error::Service(_) => f.pad("underlying service failed to process a request"),
66        }
67    }
68}
69
70impl<T, S> fmt::Debug for Error<T, S>
71where
72    T: Sink<S::Response> + TryStream,
73    S: Service<<T as TryStream>::Ok>,
74    <T as Sink<S::Response>>::Error: fmt::Debug,
75    <T as TryStream>::Error: fmt::Debug,
76    S::Error: fmt::Debug,
77{
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        match *self {
80            Error::BrokenTransportRecv(ref se) => write!(f, "BrokenTransportRecv({:?})", se),
81            Error::BrokenTransportSend(ref se) => write!(f, "BrokenTransportSend({:?})", se),
82            Error::Service(ref se) => write!(f, "Service({:?})", se),
83        }
84    }
85}
86
87impl<T, S> error::Error for Error<T, S>
88where
89    T: Sink<S::Response> + TryStream,
90    S: Service<<T as TryStream>::Ok>,
91    <T as Sink<S::Response>>::Error: error::Error + 'static,
92    <T as TryStream>::Error: error::Error + 'static,
93    S::Error: error::Error + 'static,
94{
95    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
96        match *self {
97            Error::BrokenTransportSend(ref se) => Some(se),
98            Error::BrokenTransportRecv(ref se) => Some(se),
99            Error::Service(ref se) => Some(se),
100        }
101    }
102}
103
104impl<T, S> Error<T, S>
105where
106    T: Sink<S::Response> + TryStream,
107    S: Service<<T as TryStream>::Ok>,
108{
109    fn from_sink_error(e: <T as Sink<S::Response>>::Error) -> Self {
110        Error::BrokenTransportSend(e)
111    }
112
113    fn from_stream_error(e: <T as TryStream>::Error) -> Self {
114        Error::BrokenTransportRecv(e)
115    }
116
117    fn from_service_error(e: S::Error) -> Self {
118        Error::Service(e)
119    }
120}
121
122impl<T, S> Server<T, S>
123where
124    T: Sink<S::Response> + TryStream,
125    S: Service<<T as TryStream>::Ok>,
126{
127    /// Construct a new [`Server`] over the given `transport` that services requests using the
128    /// given `service`.
129    ///
130    /// Requests are passed to `Service::call` as they arrive, and responses are written back to
131    /// the underlying `transport` in the order that the requests arrive. If a later request
132    /// completes before an earlier request, its result will be buffered until all preceding
133    /// requests have been sent.
134    pub fn new(transport: T, service: S) -> Self {
135        Server {
136            pending: FuturesOrdered::new(),
137            transport,
138            service,
139            in_flight: 0,
140            finish: false,
141        }
142    }
143
144    /*
145    /// Manage incoming new transport instances using the given service constructor.
146    ///
147    /// For each transport that `incoming` yields, a new instance of `service` is created to
148    /// manage requests on that transport. This is roughly equivalent to:
149    ///
150    /// ```rust,ignore
151    /// incoming.map(|t| Server::pipelined(t, service.new_service(), limit))
152    /// ```
153    pub fn serve_on<TS, SS, E>(
154        incoming: TS,
155        service: SS,
156        limit: Option<usize>,
157    ) -> impl Stream<Item = Self, Error = E>
158    where
159        TS: Stream<Item = T>,
160        SS: NewService<Request = S::Request, Response = S::Response, Error = S::Error, Service = S>,
161        E: From<TS::Error>,
162        E: From<SS::InitError>,
163    {
164        incoming.map_err(E::from).and_then(move |transport| {
165            service
166                .new_service()
167                .map_err(E::from)
168                .map(move |s| Server::pipelined(transport, s, limit))
169        })
170    }
171    */
172}
173
174impl<T, S> Future for Server<T, S>
175where
176    T: Sink<S::Response> + TryStream,
177    S: Service<<T as TryStream>::Ok>,
178{
179    type Output = Result<(), Error<T, S>>;
180
181    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
182        let span = tracing::trace_span!("poll");
183        let _guard = span.enter();
184        tracing::trace!("poll");
185
186        // go through the deref so we can do partial borrows
187        let this = self.project();
188
189        // we never move transport or pending, nor do we ever hand out &mut to it
190        let mut transport: Pin<_> = this.transport;
191        let mut pending: Pin<_> = this.pending;
192
193        // track how many times we have iterated
194        let mut i = 0;
195
196        loop {
197            // first, poll pending futures to see if any have produced responses
198            // note that we only poll for completed service futures if we can send the response
199            while let Poll::Ready(r) = transport.as_mut().poll_ready(cx) {
200                if let Err(e) = r {
201                    return Poll::Ready(Err(Error::from_sink_error(e)));
202                }
203
204                tracing::trace!(
205                    in_flight = *this.in_flight,
206                    pending = pending.len(),
207                    "transport.ready"
208                );
209                match pending.as_mut().try_poll_next(cx) {
210                    Poll::Ready(Some(Err(e))) => {
211                        return Poll::Ready(Err(Error::from_service_error(e)));
212                    }
213                    Poll::Ready(Some(Ok(rsp))) => {
214                        tracing::trace!("transport.start_send");
215                        // try to send the response!
216                        transport
217                            .as_mut()
218                            .start_send(rsp)
219                            .map_err(Error::from_sink_error)?;
220                        *this.in_flight -= 1;
221                    }
222                    _ => {
223                        // XXX: should we "release" the poll_ready we got from the Sink?
224                        break;
225                    }
226                }
227            }
228
229            // also try to make progress on sending
230            tracing::trace!(finish = *this.finish, "transport.poll_flush");
231            if let Poll::Ready(()) = transport
232                .as_mut()
233                .poll_flush(cx)
234                .map_err(Error::from_sink_error)?
235            {
236                if *this.finish && pending.as_mut().is_empty() {
237                    // there are no more requests
238                    // and we've finished all the work!
239                    return Poll::Ready(Ok(()));
240                }
241            }
242
243            if *this.finish {
244                // there's still work to be done, but there are no more requests
245                // so no need to check the incoming transport
246                return Poll::Pending;
247            }
248
249            // if we have run for a while without yielding, yield back so other tasks can run
250            i += 1;
251            if i == crate::YIELD_EVERY {
252                // we're forcing a yield, so need to ensure we get woken up again
253                tracing::trace!("forced yield");
254                cx.waker().wake_by_ref();
255                return Poll::Pending;
256            }
257
258            // is the service ready?
259            tracing::trace!("service.poll_ready");
260            ready!(this.service.poll_ready(cx)).map_err(Error::from_service_error)?;
261
262            tracing::trace!("transport.poll_next");
263            let rq = ready!(transport.as_mut().try_poll_next(cx))
264                .transpose()
265                .map_err(Error::from_stream_error)?;
266            if let Some(rq) = rq {
267                // the service is ready, and we have another request!
268                // you know what that means:
269                pending.push(this.service.call(rq));
270                *this.in_flight += 1;
271            } else {
272                // there are no more requests coming
273                // check one more time for responses, and then yield
274                assert!(!*this.finish);
275                *this.finish = true;
276            }
277        }
278    }
279}