tokio_tower/lib.rs
1//! This crate provides utilities for using protocols that follow certain common patterns on
2//! top of [Tokio](https://tokio.rs) and [Tower](https://github.com/tower-rs/tower).
3//!
4//! # Protocols
5//!
6//! At a high level, a protocol is a mechanism that lets you take a bunch of requests and turn them
7//! into responses. Tower provides the [`Service`](https://docs.rs/tower-service/) trait, which is
8//! an interface for mapping requests into responses, but it does not deal with how those requests
9//! are sent between clients and servers. Tokio, on the other hand, provides asynchronous
10//! communication primitives, but it does not deal with high-level abstractions like services. This
11//! crate attempts to bridge that gap.
12//!
13//! There are many types of protocols in the wild, but they generally come in two forms:
14//! *pipelining* and *multiplexing*. A pipelining protocol sends requests and responses in-order
15//! between the consumer and provider of a service, and processes requests one at a time. A
16//! multiplexing protocol on the other hand constructs requests in such a way that they can be
17//! handled and responded to in *any* order while still allowing the client to know which response
18//! is for which request. Pipelining and multiplexing both have their advantages and disadvantages;
19//! see the module-level documentation for [`pipeline`] and [`multiplex`] for details. There is
20//! also good deal of discussion in [this StackOverflow
21//! answer](https://softwareengineering.stackexchange.com/a/325888/79642).
22//!
23//! # Transports
24//!
25//! A key part of any protocol is its transport, which is the way that it transmits requests and
26//! responses. In general, `tokio-tower` leaves the on-the-wire implementations of protocols to
27//! other crates (like [`tokio-codec`](https://docs.rs/tokio-codec/) or
28//! [`async-bincode`](https://docs.rs/async-bincode)) and instead operates at the level of
29//! [`Sink`](https://docs.rs/futures/0.1/futures/sink/trait.Sink.html)s and
30//! [`Stream`](https://docs.rs/futures/0.15/futures/stream/trait.Stream.html)s.
31//!
32//! At its core, `tokio-tower` wraps a type that is `Sink + Stream`. On the client side, the Sink
33//! is used to send requests, and the Stream is used to receive responses (from the server) to
34//! those requests. On the server side, the Stream is used to receive requests, and the Sink is
35//! used to send the responses.
36//!
37//! # Servers and clients
38//!
39//! This crate provides utilities that make writing both clients and servers easier. You'll find
40//! the client helper as `Client` in the protocol module you're working with (e.g.,
41//! [`pipeline::Client`]), and the server helper as `Server` in the same place.
42//!
43//! # Example
44//! ```rust
45//! # use std::pin::Pin;
46//! # use std::boxed::Box;
47//! # use tokio::sync::mpsc;
48//! # use tokio::io::{AsyncWrite, AsyncRead};
49//! # use futures_core::task::{Context, Poll};
50//! # use futures_util::{never::Never, future::{poll_fn, ready, Ready}};
51//! # use tokio_tower::pipeline;
52//! # use core::fmt::Debug;
53//! type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
54//!
55//! /// A transport implemented using a pair of `mpsc` channels.
56//! ///
57//! /// `mpsc::Sender` and `mpsc::Receiver` are both unidirectional. So, if we want to use `mpsc`
58//! /// to send requests and responses between a client and server, we need *two* channels, one
59//! /// that lets requests flow from the client to the server, and one that lets responses flow the
60//! /// other way.
61//! ///
62//! /// In this echo server example, requests and responses are both of type `T`, but for "real"
63//! /// services, the two types are usually different.
64//! struct ChannelTransport<T> {
65//! rcv: mpsc::UnboundedReceiver<T>,
66//! snd: mpsc::UnboundedSender<T>,
67//! }
68//!
69//! impl<T: Debug> futures_sink::Sink<T> for ChannelTransport<T> {
70//! type Error = StdError;
71//!
72//! fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
73//! Poll::Ready(Ok(()))
74//! }
75//!
76//! fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
77//! // use map_err because `T` contained in `mpsc::SendError` may not be `Send + Sync`.
78//! self.snd.send(item).map_err(|e| e.to_string())?;
79//! Ok(())
80//! }
81//!
82//! fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
83//! Poll::Ready(Ok(())) // no-op because all sends succeed immediately
84//! }
85//!
86//! fn poll_close( self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
87//! Poll::Ready(Ok(())) // no-op because channel is closed on drop and flush is no-op
88//! }
89//! }
90//!
91//! impl<T> futures_util::stream::Stream for ChannelTransport<T> {
92//! type Item = Result<T, StdError>;
93//!
94//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
95//! self.rcv.poll_recv(cx).map(|s| s.map(Ok))
96//! }
97//! }
98//!
99//! /// A service that tokio-tower should serve over the transport.
100//! /// This one just echoes whatever it gets.
101//! struct Echo;
102//!
103//! impl<T> tower_service::Service<T> for Echo {
104//! type Response = T;
105//! type Error = Never;
106//! type Future = Ready<Result<Self::Response, Self::Error>>;
107//!
108//! fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
109//! Poll::Ready(Ok(()))
110//! }
111//!
112//! fn call(&mut self, req: T) -> Self::Future {
113//! ready(Ok(req))
114//! }
115//! }
116//!
117//! #[tokio::main]
118//! async fn main() {
119//! let (s1, r1) = mpsc::unbounded_channel();
120//! let (s2, r2) = mpsc::unbounded_channel();
121//! let pair1 = ChannelTransport{snd: s1, rcv: r2};
122//! let pair2 = ChannelTransport{snd: s2, rcv: r1};
123//!
124//! tokio::spawn(pipeline::Server::new(pair1, Echo));
125//! let mut client = pipeline::Client::<_, tokio_tower::Error<_, _>, _>::new(pair2);
126//!
127//! use tower_service::Service;
128//! poll_fn(|cx| client.poll_ready(cx)).await;
129//!
130//! let msg = "Hello, tokio-tower";
131//! let resp = client.call(String::from(msg)).await.expect("client call");
132//! assert_eq!(resp, msg);
133//! }
134//!
135//! ```
136#![warn(
137 missing_docs,
138 missing_debug_implementations,
139 unreachable_pub,
140 rust_2018_idioms
141)]
142#![allow(clippy::type_complexity)]
143
144const YIELD_EVERY: usize = 24;
145
146mod error;
147mod mediator;
148pub(crate) mod wrappers;
149pub use error::Error;
150
151use futures_core::{
152 future::Future,
153 stream::TryStream,
154 task::{Context, Poll},
155};
156use futures_sink::Sink;
157use tower_service::Service;
158
159/// Creates new `Transport` (i.e., `Sink + Stream`) instances.
160///
161/// Acts as a transport factory. This is useful for cases where new `Sink + Stream`
162/// values must be produced.
163///
164/// This is essentially a trait alias for a `Service` of `Sink + Stream`s.
165pub trait MakeTransport<Target, Request>: self::sealed::Sealed<Target, Request> {
166 /// Items produced by the transport
167 type Item;
168
169 /// Errors produced when receiving from the transport
170 type Error;
171
172 /// Errors produced when sending to the transport
173 type SinkError;
174
175 /// The `Sink + Stream` implementation created by this factory
176 type Transport: TryStream<Ok = Self::Item, Error = Self::Error>
177 + Sink<Request, Error = Self::SinkError>;
178
179 /// Errors produced while building a transport.
180 type MakeError;
181
182 /// The future of the `Service` instance.
183 type Future: Future<Output = Result<Self::Transport, Self::MakeError>>;
184
185 /// Returns `Ready` when the factory is able to create more transports.
186 ///
187 /// If the service is at capacity, then `NotReady` is returned and the task
188 /// is notified when the service becomes ready again. This function is
189 /// expected to be called while on a task.
190 ///
191 /// This is a **best effort** implementation. False positives are permitted.
192 /// It is permitted for the service to return `Ready` from a `poll_ready`
193 /// call and the next invocation of `make_transport` results in an error.
194 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::MakeError>>;
195
196 /// Create and return a new transport asynchronously.
197 fn make_transport(&mut self, target: Target) -> Self::Future;
198}
199
200impl<M, T, Target, Request> self::sealed::Sealed<Target, Request> for M
201where
202 M: Service<Target, Response = T>,
203 T: TryStream + Sink<Request>,
204{
205}
206
207impl<M, T, Target, Request> MakeTransport<Target, Request> for M
208where
209 M: Service<Target, Response = T>,
210 T: TryStream + Sink<Request>,
211{
212 type Item = <T as TryStream>::Ok;
213 type Error = <T as TryStream>::Error;
214 type SinkError = <T as Sink<Request>>::Error;
215 type Transport = T;
216 type MakeError = M::Error;
217 type Future = M::Future;
218
219 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::MakeError>> {
220 Service::poll_ready(self, cx)
221 }
222
223 fn make_transport(&mut self, target: Target) -> Self::Future {
224 Service::call(self, target)
225 }
226}
227
228mod sealed {
229 pub trait Sealed<A, B> {}
230}
231
232pub mod multiplex;
233pub mod pipeline;