tokio_tower/
lib.rs

1//! This crate provides utilities for using protocols that follow certain common patterns on
2//! top of [Tokio](https://tokio.rs) and [Tower](https://github.com/tower-rs/tower).
3//!
4//! # Protocols
5//!
6//! At a high level, a protocol is a mechanism that lets you take a bunch of requests and turn them
7//! into responses. Tower provides the [`Service`](https://docs.rs/tower-service/) trait, which is
8//! an interface for mapping requests into responses, but it does not deal with how those requests
9//! are sent between clients and servers. Tokio, on the other hand, provides asynchronous
10//! communication primitives, but it does not deal with high-level abstractions like services. This
11//! crate attempts to bridge that gap.
12//!
13//! There are many types of protocols in the wild, but they generally come in two forms:
14//! *pipelining* and *multiplexing*. A pipelining protocol sends requests and responses in-order
15//! between the consumer and provider of a service, and processes requests one at a time. A
16//! multiplexing protocol on the other hand constructs requests in such a way that they can be
17//! handled and responded to in *any* order while still allowing the client to know which response
18//! is for which request. Pipelining and multiplexing both have their advantages and disadvantages;
19//! see the module-level documentation for [`pipeline`] and [`multiplex`] for details. There is
20//! also good deal of discussion in [this StackOverflow
21//! answer](https://softwareengineering.stackexchange.com/a/325888/79642).
22//!
23//! # Transports
24//!
25//! A key part of any protocol is its transport, which is the way that it transmits requests and
26//! responses. In general, `tokio-tower` leaves the on-the-wire implementations of protocols to
27//! other crates (like [`tokio-codec`](https://docs.rs/tokio-codec/) or
28//! [`async-bincode`](https://docs.rs/async-bincode)) and instead operates at the level of
29//! [`Sink`](https://docs.rs/futures/0.1/futures/sink/trait.Sink.html)s and
30//! [`Stream`](https://docs.rs/futures/0.15/futures/stream/trait.Stream.html)s.
31//!
32//! At its core, `tokio-tower` wraps a type that is `Sink + Stream`. On the client side, the Sink
33//! is used to send requests, and the Stream is used to receive responses (from the server) to
34//! those requests. On the server side, the Stream is used to receive requests, and the Sink is
35//! used to send the responses.
36//!
37//! # Servers and clients
38//!
39//! This crate provides utilities that make writing both clients and servers easier. You'll find
40//! the client helper as `Client` in the protocol module you're working with (e.g.,
41//! [`pipeline::Client`]), and the server helper as `Server` in the same place.
42//!
43//! # Example
44//! ```rust
45//! # use std::pin::Pin;
46//! # use std::boxed::Box;
47//! # use tokio::sync::mpsc;
48//! # use tokio::io::{AsyncWrite, AsyncRead};
49//! # use futures_core::task::{Context, Poll};
50//! # use futures_util::{never::Never, future::{poll_fn, ready, Ready}};
51//! # use tokio_tower::pipeline;
52//! # use core::fmt::Debug;
53//! type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
54//!
55//! /// A transport implemented using a pair of `mpsc` channels.
56//! ///
57//! /// `mpsc::Sender` and `mpsc::Receiver` are both unidirectional. So, if we want to use `mpsc`
58//! /// to send requests and responses between a client and server, we need *two* channels, one
59//! /// that lets requests flow from the client to the server, and one that lets responses flow the
60//! /// other way.
61//! ///
62//! /// In this echo server example, requests and responses are both of type `T`, but for "real"
63//! /// services, the two types are usually different.
64//! struct ChannelTransport<T> {
65//!     rcv: mpsc::UnboundedReceiver<T>,
66//!     snd: mpsc::UnboundedSender<T>,
67//! }
68//!
69//! impl<T: Debug> futures_sink::Sink<T> for ChannelTransport<T> {
70//!     type Error = StdError;
71//!
72//!     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
73//!         Poll::Ready(Ok(()))
74//!     }
75//!
76//!     fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
77//!         // use map_err because `T` contained in `mpsc::SendError` may not be `Send + Sync`.
78//!         self.snd.send(item).map_err(|e| e.to_string())?;
79//!         Ok(())
80//!     }
81//!
82//!     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
83//!         Poll::Ready(Ok(())) // no-op because all sends succeed immediately
84//!     }
85//!
86//!     fn poll_close( self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
87//!         Poll::Ready(Ok(())) // no-op because channel is closed on drop and flush is no-op
88//!     }
89//! }
90//!
91//! impl<T> futures_util::stream::Stream for ChannelTransport<T> {
92//!     type Item = Result<T, StdError>;
93//!
94//!     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
95//!         self.rcv.poll_recv(cx).map(|s| s.map(Ok))
96//!     }
97//! }
98//!
99//! /// A service that tokio-tower should serve over the transport.
100//! /// This one just echoes whatever it gets.
101//! struct Echo;
102//!
103//! impl<T> tower_service::Service<T> for Echo {
104//!     type Response = T;
105//!     type Error = Never;
106//!     type Future = Ready<Result<Self::Response, Self::Error>>;
107//!
108//!     fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
109//!         Poll::Ready(Ok(()))
110//!     }
111//!
112//!     fn call(&mut self, req: T) -> Self::Future {
113//!         ready(Ok(req))
114//!     }
115//! }
116//!
117//! #[tokio::main]
118//! async fn main() {
119//!     let (s1, r1) = mpsc::unbounded_channel();
120//!     let (s2, r2) = mpsc::unbounded_channel();
121//!     let pair1 = ChannelTransport{snd: s1, rcv: r2};
122//!     let pair2 = ChannelTransport{snd: s2, rcv: r1};
123//!
124//!     tokio::spawn(pipeline::Server::new(pair1, Echo));
125//!     let mut client = pipeline::Client::<_, tokio_tower::Error<_, _>, _>::new(pair2);
126//!
127//!     use tower_service::Service;
128//!     poll_fn(|cx| client.poll_ready(cx)).await;
129//!
130//!     let msg = "Hello, tokio-tower";
131//!     let resp = client.call(String::from(msg)).await.expect("client call");
132//!     assert_eq!(resp, msg);
133//! }
134//!
135//! ```
136#![warn(
137    missing_docs,
138    missing_debug_implementations,
139    unreachable_pub,
140    rust_2018_idioms
141)]
142#![allow(clippy::type_complexity)]
143
144const YIELD_EVERY: usize = 24;
145
146mod error;
147mod mediator;
148pub(crate) mod wrappers;
149pub use error::Error;
150
151use futures_core::{
152    future::Future,
153    stream::TryStream,
154    task::{Context, Poll},
155};
156use futures_sink::Sink;
157use tower_service::Service;
158
159/// Creates new `Transport` (i.e., `Sink + Stream`) instances.
160///
161/// Acts as a transport factory. This is useful for cases where new `Sink + Stream`
162/// values must be produced.
163///
164/// This is essentially a trait alias for a `Service` of `Sink + Stream`s.
165pub trait MakeTransport<Target, Request>: self::sealed::Sealed<Target, Request> {
166    /// Items produced by the transport
167    type Item;
168
169    /// Errors produced when receiving from the transport
170    type Error;
171
172    /// Errors produced when sending to the transport
173    type SinkError;
174
175    /// The `Sink + Stream` implementation created by this factory
176    type Transport: TryStream<Ok = Self::Item, Error = Self::Error>
177        + Sink<Request, Error = Self::SinkError>;
178
179    /// Errors produced while building a transport.
180    type MakeError;
181
182    /// The future of the `Service` instance.
183    type Future: Future<Output = Result<Self::Transport, Self::MakeError>>;
184
185    /// Returns `Ready` when the factory is able to create more transports.
186    ///
187    /// If the service is at capacity, then `NotReady` is returned and the task
188    /// is notified when the service becomes ready again. This function is
189    /// expected to be called while on a task.
190    ///
191    /// This is a **best effort** implementation. False positives are permitted.
192    /// It is permitted for the service to return `Ready` from a `poll_ready`
193    /// call and the next invocation of `make_transport` results in an error.
194    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::MakeError>>;
195
196    /// Create and return a new transport asynchronously.
197    fn make_transport(&mut self, target: Target) -> Self::Future;
198}
199
200impl<M, T, Target, Request> self::sealed::Sealed<Target, Request> for M
201where
202    M: Service<Target, Response = T>,
203    T: TryStream + Sink<Request>,
204{
205}
206
207impl<M, T, Target, Request> MakeTransport<Target, Request> for M
208where
209    M: Service<Target, Response = T>,
210    T: TryStream + Sink<Request>,
211{
212    type Item = <T as TryStream>::Ok;
213    type Error = <T as TryStream>::Error;
214    type SinkError = <T as Sink<Request>>::Error;
215    type Transport = T;
216    type MakeError = M::Error;
217    type Future = M::Future;
218
219    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::MakeError>> {
220        Service::poll_ready(self, cx)
221    }
222
223    fn make_transport(&mut self, target: Target) -> Self::Future {
224        Service::call(self, target)
225    }
226}
227
228mod sealed {
229    pub trait Sealed<A, B> {}
230}
231
232pub mod multiplex;
233pub mod pipeline;