Expand description
This crate provides utilities for using protocols that follow certain common patterns on top of Tokio and Tower.
§Protocols
At a high level, a protocol is a mechanism that lets you take a bunch of requests and turn them
into responses. Tower provides the Service
trait, which is
an interface for mapping requests into responses, but it does not deal with how those requests
are sent between clients and servers. Tokio, on the other hand, provides asynchronous
communication primitives, but it does not deal with high-level abstractions like services. This
crate attempts to bridge that gap.
There are many types of protocols in the wild, but they generally come in two forms:
pipelining and multiplexing. A pipelining protocol sends requests and responses in-order
between the consumer and provider of a service, and processes requests one at a time. A
multiplexing protocol on the other hand constructs requests in such a way that they can be
handled and responded to in any order while still allowing the client to know which response
is for which request. Pipelining and multiplexing both have their advantages and disadvantages;
see the module-level documentation for pipeline
and multiplex
for details. There is
also good deal of discussion in this StackOverflow
answer.
§Transports
A key part of any protocol is its transport, which is the way that it transmits requests and
responses. In general, tokio-tower
leaves the on-the-wire implementations of protocols to
other crates (like tokio-codec
or
async-bincode
) and instead operates at the level of
Sink
s and
Stream
s.
At its core, tokio-tower
wraps a type that is Sink + Stream
. On the client side, the Sink
is used to send requests, and the Stream is used to receive responses (from the server) to
those requests. On the server side, the Stream is used to receive requests, and the Sink is
used to send the responses.
§Servers and clients
This crate provides utilities that make writing both clients and servers easier. You’ll find
the client helper as Client
in the protocol module you’re working with (e.g.,
pipeline::Client
), and the server helper as Server
in the same place.
§Example
type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// A transport implemented using a pair of `mpsc` channels.
///
/// `mpsc::Sender` and `mpsc::Receiver` are both unidirectional. So, if we want to use `mpsc`
/// to send requests and responses between a client and server, we need *two* channels, one
/// that lets requests flow from the client to the server, and one that lets responses flow the
/// other way.
///
/// In this echo server example, requests and responses are both of type `T`, but for "real"
/// services, the two types are usually different.
struct ChannelTransport<T> {
rcv: mpsc::UnboundedReceiver<T>,
snd: mpsc::UnboundedSender<T>,
}
impl<T: Debug> futures_sink::Sink<T> for ChannelTransport<T> {
type Error = StdError;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
// use map_err because `T` contained in `mpsc::SendError` may not be `Send + Sync`.
self.snd.send(item).map_err(|e| e.to_string())?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(())) // no-op because all sends succeed immediately
}
fn poll_close( self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(())) // no-op because channel is closed on drop and flush is no-op
}
}
impl<T> futures_util::stream::Stream for ChannelTransport<T> {
type Item = Result<T, StdError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.rcv.poll_recv(cx).map(|s| s.map(Ok))
}
}
/// A service that tokio-tower should serve over the transport.
/// This one just echoes whatever it gets.
struct Echo;
impl<T> tower_service::Service<T> for Echo {
type Response = T;
type Error = Never;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: T) -> Self::Future {
ready(Ok(req))
}
}
#[tokio::main]
async fn main() {
let (s1, r1) = mpsc::unbounded_channel();
let (s2, r2) = mpsc::unbounded_channel();
let pair1 = ChannelTransport{snd: s1, rcv: r2};
let pair2 = ChannelTransport{snd: s2, rcv: r1};
tokio::spawn(pipeline::Server::new(pair1, Echo));
let mut client = pipeline::Client::<_, tokio_tower::Error<_, _>, _>::new(pair2);
use tower_service::Service;
poll_fn(|cx| client.poll_ready(cx)).await;
let msg = "Hello, tokio-tower";
let resp = client.call(String::from(msg)).await.expect("client call");
assert_eq!(resp, msg);
}
Modules§
- In a multiplexed protocol, the server responds to client requests in the order they complete. Request IDs (
TagStore::Tag
) are used to match up responses with the request that triggered them. This allows the server to process requests out-of-order, and eliminates the application-level head-of-line blocking that pipelined protocols suffer from. Example multiplexed protocols include SSH, HTTP/2, and AMQP. This page has some further details about how multiplexing protocols operate. - In a pipelined protocol, the server responds to client requests in the order they were sent. Many requests can be in flight at the same time, but no request sees a response until all previous requests have been satisfied. Pipelined protocols can experience head-of-line blocking wherein a slow-to-process request prevents any subsequent request from being processed, but are often to easier to implement on the server side, and provide clearer request ordering semantics. Example pipelined protocols include HTTP/1.1, MySQL, and Redis.
Enums§
- An error that occurred while servicing a request.
Traits§
- Creates new
Transport
(i.e.,Sink + Stream
) instances.