tokio_tower/multiplex/
mod.rs

1//! In a multiplexed protocol, the server responds to client requests in the order they complete.
2//! Request IDs ([`TagStore::Tag`]) are used to match up responses with the request that triggered
3//! them. This allows the server to process requests out-of-order, and eliminates the
4//! application-level head-of-line blocking that pipelined protocols suffer from. Example
5//! multiplexed protocols include SSH, HTTP/2, and AMQP. [This
6//! page](https://250bpm.com/blog:18/) has some further details about how multiplexing protocols
7//! operate.
8//!
9//! Note: multiplexing with the max number of in-flight requests set to 1 implies that for each
10//! request, the response must be received before sending another request on the same connection.
11
12use futures_core::stream::{Stream, TryStream};
13use futures_sink::Sink;
14use pin_project::pin_project;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17
18/// Client bindings for a multiplexed protocol.
19pub mod client;
20pub use self::client::{Client, TagStore};
21
22/// Server bindings for a multiplexed protocol.
23pub mod server;
24pub use self::server::Server;
25
26/// A convenience wrapper that lets you take separate transport and tag store types and use them as
27/// a single [`client::Transport`].
28#[pin_project]
29#[derive(Debug)]
30pub struct MultiplexTransport<T, S> {
31    #[pin]
32    transport: T,
33    #[pin]
34    tagger: S,
35}
36
37impl<T, S> MultiplexTransport<T, S> {
38    /// Fuse together the given `transport` and `tagger` into a single `Transport`.
39    pub fn new(transport: T, tagger: S) -> Self {
40        MultiplexTransport { transport, tagger }
41    }
42}
43
44impl<T, S, Request> Sink<Request> for MultiplexTransport<T, S>
45where
46    T: Sink<Request>,
47{
48    type Error = <T as Sink<Request>>::Error;
49
50    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51        self.project().transport.poll_ready(cx)
52    }
53    fn start_send(self: Pin<&mut Self>, item: Request) -> Result<(), Self::Error> {
54        self.project().transport.start_send(item)
55    }
56    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57        self.project().transport.poll_flush(cx)
58    }
59    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
60        self.project().transport.poll_close(cx)
61    }
62}
63
64impl<T, S> Stream for MultiplexTransport<T, S>
65where
66    T: TryStream,
67{
68    type Item = Result<<T as TryStream>::Ok, <T as TryStream>::Error>;
69    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        self.project().transport.try_poll_next(cx)
71    }
72}
73
74impl<T, S, Request> TagStore<Request, <T as TryStream>::Ok> for MultiplexTransport<T, S>
75where
76    T: Sink<Request> + TryStream,
77    S: TagStore<Request, <T as TryStream>::Ok>,
78{
79    type Tag = <S as TagStore<Request, <T as TryStream>::Ok>>::Tag;
80    fn assign_tag(self: Pin<&mut Self>, req: &mut Request) -> Self::Tag {
81        self.project().tagger.assign_tag(req)
82    }
83    fn finish_tag(self: Pin<&mut Self>, rsp: &<T as TryStream>::Ok) -> Self::Tag {
84        self.project().tagger.finish_tag(rsp)
85    }
86}