tokio_tower/multiplex/
mod.rs1use futures_core::stream::{Stream, TryStream};
13use futures_sink::Sink;
14use pin_project::pin_project;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17
18pub mod client;
20pub use self::client::{Client, TagStore};
21
22pub mod server;
24pub use self::server::Server;
25
26#[pin_project]
29#[derive(Debug)]
30pub struct MultiplexTransport<T, S> {
31 #[pin]
32 transport: T,
33 #[pin]
34 tagger: S,
35}
36
37impl<T, S> MultiplexTransport<T, S> {
38 pub fn new(transport: T, tagger: S) -> Self {
40 MultiplexTransport { transport, tagger }
41 }
42}
43
44impl<T, S, Request> Sink<Request> for MultiplexTransport<T, S>
45where
46 T: Sink<Request>,
47{
48 type Error = <T as Sink<Request>>::Error;
49
50 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51 self.project().transport.poll_ready(cx)
52 }
53 fn start_send(self: Pin<&mut Self>, item: Request) -> Result<(), Self::Error> {
54 self.project().transport.start_send(item)
55 }
56 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57 self.project().transport.poll_flush(cx)
58 }
59 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
60 self.project().transport.poll_close(cx)
61 }
62}
63
64impl<T, S> Stream for MultiplexTransport<T, S>
65where
66 T: TryStream,
67{
68 type Item = Result<<T as TryStream>::Ok, <T as TryStream>::Error>;
69 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 self.project().transport.try_poll_next(cx)
71 }
72}
73
74impl<T, S, Request> TagStore<Request, <T as TryStream>::Ok> for MultiplexTransport<T, S>
75where
76 T: Sink<Request> + TryStream,
77 S: TagStore<Request, <T as TryStream>::Ok>,
78{
79 type Tag = <S as TagStore<Request, <T as TryStream>::Ok>>::Tag;
80 fn assign_tag(self: Pin<&mut Self>, req: &mut Request) -> Self::Tag {
81 self.project().tagger.assign_tag(req)
82 }
83 fn finish_tag(self: Pin<&mut Self>, rsp: &<T as TryStream>::Ok) -> Self::Tag {
84 self.project().tagger.finish_tag(rsp)
85 }
86}