tower_buffer/
service.rs

1use crate::{
2    error::Error,
3    future::ResponseFuture,
4    message::Message,
5    worker::{Handle, Worker},
6};
7
8use futures_core::ready;
9use std::task::{Context, Poll};
10use tokio::sync::{mpsc, oneshot};
11use tower_service::Service;
12
13/// Adds a buffer in front of an inner service.
14///
15/// See crate level documentation for more details.
16#[derive(Debug)]
17pub struct Buffer<T, Request>
18where
19    T: Service<Request>,
20{
21    tx: mpsc::Sender<Message<Request, T::Future>>,
22    handle: Handle,
23}
24
25impl<T, Request> Buffer<T, Request>
26where
27    T: Service<Request>,
28    T::Error: Into<Error>,
29{
30    /// Creates a new `Buffer` wrapping `service`.
31    ///
32    /// `bound` gives the maximal number of requests that can be queued for the service before
33    /// backpressure is applied to callers.
34    ///
35    /// The default Tokio executor is used to run the given service, which means that this method
36    /// must be called while on the Tokio runtime.
37    pub fn new(service: T, bound: usize) -> Self
38    where
39        T: Send + 'static,
40        T::Future: Send,
41        T::Error: Send + Sync,
42        Request: Send + 'static,
43    {
44        let (tx, rx) = mpsc::channel(bound);
45        let (handle, worker) = Worker::new(service, rx);
46        tokio::spawn(worker);
47        Buffer { tx, handle }
48    }
49
50    /// Creates a new `Buffer` wrapping `service` but returns the background worker.
51    ///
52    /// This is useful if you do not want to spawn directly onto the `tokio` runtime
53    /// but instead want to use your own executor. This will return the `Buffer` and
54    /// the background `Worker` that you can then spawn.
55    pub fn pair(service: T, bound: usize) -> (Buffer<T, Request>, Worker<T, Request>)
56    where
57        T: Send + 'static,
58        T::Error: Send + Sync,
59        Request: Send + 'static,
60    {
61        let (tx, rx) = mpsc::channel(bound);
62        let (handle, worker) = Worker::new(service, rx);
63        (Buffer { tx, handle }, worker)
64    }
65
66    fn get_worker_error(&self) -> Error {
67        self.handle.get_error_on_closed()
68    }
69}
70
71impl<T, Request> Service<Request> for Buffer<T, Request>
72where
73    T: Service<Request>,
74    T::Error: Into<Error>,
75{
76    type Response = T::Response;
77    type Error = Error;
78    type Future = ResponseFuture<T::Future>;
79
80    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        // If the inner service has errored, then we error here.
82        if let Err(_) = ready!(self.tx.poll_ready(cx)) {
83            Poll::Ready(Err(self.get_worker_error()))
84        } else {
85            Poll::Ready(Ok(()))
86        }
87    }
88
89    fn call(&mut self, request: Request) -> Self::Future {
90        // TODO:
91        // ideally we'd poll_ready again here so we don't allocate the oneshot
92        // if the try_send is about to fail, but sadly we can't call poll_ready
93        // outside of task context.
94        let (tx, rx) = oneshot::channel();
95
96        // get the current Span so that we can explicitly propagate it to the worker
97        // if we didn't do this, events on the worker related to this span wouldn't be counted
98        // towards that span since the worker would have no way of entering it.
99        let span = tracing::Span::current();
100        tracing::trace!(parent: &span, "sending request to buffer worker");
101        match self.tx.try_send(Message { request, span, tx }) {
102            Err(mpsc::error::TrySendError::Closed(_)) => {
103                ResponseFuture::failed(self.get_worker_error())
104            }
105            Err(mpsc::error::TrySendError::Full(_)) => {
106                // When `mpsc::Sender::poll_ready` returns `Ready`, a slot
107                // in the channel is reserved for the handle. Other `Sender`
108                // handles may not send a message using that slot. This
109                // guarantees capacity for `request`.
110                //
111                // Given this, the only way to hit this code path is if
112                // `poll_ready` has not been called & `Ready` returned.
113                panic!("buffer full; poll_ready must be called first");
114            }
115            Ok(_) => ResponseFuture::new(rx),
116        }
117    }
118}
119
120impl<T, Request> Clone for Buffer<T, Request>
121where
122    T: Service<Request>,
123{
124    fn clone(&self) -> Self {
125        Self {
126            tx: self.tx.clone(),
127            handle: self.handle.clone(),
128        }
129    }
130}