1use crate::{
2 error::Error,
3 future::ResponseFuture,
4 message::Message,
5 worker::{Handle, Worker},
6};
7
8use futures_core::ready;
9use std::task::{Context, Poll};
10use tokio::sync::{mpsc, oneshot};
11use tower_service::Service;
12
13#[derive(Debug)]
17pub struct Buffer<T, Request>
18where
19 T: Service<Request>,
20{
21 tx: mpsc::Sender<Message<Request, T::Future>>,
22 handle: Handle,
23}
24
25impl<T, Request> Buffer<T, Request>
26where
27 T: Service<Request>,
28 T::Error: Into<Error>,
29{
30 pub fn new(service: T, bound: usize) -> Self
38 where
39 T: Send + 'static,
40 T::Future: Send,
41 T::Error: Send + Sync,
42 Request: Send + 'static,
43 {
44 let (tx, rx) = mpsc::channel(bound);
45 let (handle, worker) = Worker::new(service, rx);
46 tokio::spawn(worker);
47 Buffer { tx, handle }
48 }
49
50 pub fn pair(service: T, bound: usize) -> (Buffer<T, Request>, Worker<T, Request>)
56 where
57 T: Send + 'static,
58 T::Error: Send + Sync,
59 Request: Send + 'static,
60 {
61 let (tx, rx) = mpsc::channel(bound);
62 let (handle, worker) = Worker::new(service, rx);
63 (Buffer { tx, handle }, worker)
64 }
65
66 fn get_worker_error(&self) -> Error {
67 self.handle.get_error_on_closed()
68 }
69}
70
71impl<T, Request> Service<Request> for Buffer<T, Request>
72where
73 T: Service<Request>,
74 T::Error: Into<Error>,
75{
76 type Response = T::Response;
77 type Error = Error;
78 type Future = ResponseFuture<T::Future>;
79
80 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81 if let Err(_) = ready!(self.tx.poll_ready(cx)) {
83 Poll::Ready(Err(self.get_worker_error()))
84 } else {
85 Poll::Ready(Ok(()))
86 }
87 }
88
89 fn call(&mut self, request: Request) -> Self::Future {
90 let (tx, rx) = oneshot::channel();
95
96 let span = tracing::Span::current();
100 tracing::trace!(parent: &span, "sending request to buffer worker");
101 match self.tx.try_send(Message { request, span, tx }) {
102 Err(mpsc::error::TrySendError::Closed(_)) => {
103 ResponseFuture::failed(self.get_worker_error())
104 }
105 Err(mpsc::error::TrySendError::Full(_)) => {
106 panic!("buffer full; poll_ready must be called first");
114 }
115 Ok(_) => ResponseFuture::new(rx),
116 }
117 }
118}
119
120impl<T, Request> Clone for Buffer<T, Request>
121where
122 T: Service<Request>,
123{
124 fn clone(&self) -> Self {
125 Self {
126 tx: self.tx.clone(),
127 handle: self.handle.clone(),
128 }
129 }
130}