quic_rpc/transport/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//! Built in transports for quic-rpc
//!
//! There are two sides to a transport, a server side where connections are
//! accepted and a client side where connections are initiated.
//!
//! Connections are bidirectional typed channels, with a distinct type for
//! the send and receive side. They are *unrelated* to services.
//!
//! In the transport module, the message types are referred to as `In` and `Out`.
//!
//! A [`Connector`] can be used to *open* bidirectional typed channels using
//! [`Connector::open`]. A [`Listener`] can be used to *accept* bidirectional
//! typed channels from any of the currently opened connections to clients, using
//! [`Listener::accept`].
//!
//! In both cases, the result is a tuple of a send side and a receive side. These
//! types are defined by implementing the [`StreamTypes`] trait.
//!
//! Errors for both sides are defined by implementing the [`ConnectionErrors`] trait.
use std::{
    fmt::{self, Debug, Display},
    net::SocketAddr,
};

use boxed::{BoxableConnector, BoxableListener, BoxedConnector, BoxedListener};
use futures_lite::{Future, Stream};
use futures_sink::Sink;
use mapped::MappedConnector;

use crate::{RpcError, RpcMessage};

pub mod boxed;
pub mod combined;
#[cfg(feature = "flume-transport")]
pub mod flume;
#[cfg(feature = "hyper-transport")]
pub mod hyper;
#[cfg(feature = "iroh-net-transport")]
pub mod iroh_net;
pub mod mapped;
pub mod misc;
#[cfg(feature = "quinn-transport")]
pub mod quinn;

#[cfg(any(
    feature = "quinn-transport",
    feature = "hyper-transport",
    feature = "iroh-net-transport"
))]
mod util;

/// Errors that can happen when creating and using a [`Connector`] or [`Listener`].
pub trait ConnectionErrors: Debug + Clone + Send + Sync + 'static {
    /// Error when sending a message via a channel
    type SendError: RpcError;
    /// Error when receiving a message via a channel
    type RecvError: RpcError;
    /// Error when opening a channel
    type OpenError: RpcError;
    /// Error when accepting a channel
    type AcceptError: RpcError;
}

/// Types that are common to both [`Connector`] and [`Listener`].
///
/// Having this as a separate trait is useful when writing generic code that works with both.
pub trait StreamTypes: ConnectionErrors {
    /// The type of messages that can be received on the channel
    type In: RpcMessage;
    /// The type of messages that can be sent on the channel
    type Out: RpcMessage;
    /// Receive side of a bidirectional typed channel
    type RecvStream: Stream<Item = Result<Self::In, Self::RecvError>>
        + Send
        + Sync
        + Unpin
        + 'static;
    /// Send side of a bidirectional typed channel
    type SendSink: Sink<Self::Out, Error = Self::SendError> + Send + Sync + Unpin + 'static;
}

/// A connection to a specific remote machine
///
/// A connection can be used to open bidirectional typed channels using [`Connector::open`].
pub trait Connector: StreamTypes {
    /// Open a channel to the remote che
    fn open(
        &self,
    ) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::OpenError>> + Send;

    /// Map the input and output types of this connection
    fn map<In1, Out1>(self) -> MappedConnector<In1, Out1, Self>
    where
        In1: TryFrom<Self::In>,
        Self::Out: From<Out1>,
    {
        MappedConnector::new(self)
    }

    /// Box the connection
    fn boxed(self) -> BoxedConnector<Self::In, Self::Out>
    where
        Self: BoxableConnector<Self::In, Self::Out> + Sized + 'static,
    {
        self::BoxedConnector::new(self)
    }
}

/// A listener that listens for connections
///
/// A listener can be used to accept bidirectional typed channels from any of the
/// currently opened connections to clients, using [`Listener::accept`].
pub trait Listener: StreamTypes {
    /// Accept a new typed bidirectional channel on any of the connections we
    /// have currently opened.
    fn accept(
        &self,
    ) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::AcceptError>> + Send;

    /// The local addresses this endpoint is bound to.
    fn local_addr(&self) -> &[LocalAddr];

    /// Box the listener
    fn boxed(self) -> BoxedListener<Self::In, Self::Out>
    where
        Self: BoxableListener<Self::In, Self::Out> + Sized + 'static,
    {
        BoxedListener::new(self)
    }
}

/// The kinds of local addresses a [Listener] can be bound to.
///
/// Returned by [Listener::local_addr].
///
/// [`Display`]: fmt::Display
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum LocalAddr {
    /// A local socket.
    Socket(SocketAddr),
    /// An in-memory address.
    Mem,
}

impl Display for LocalAddr {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            LocalAddr::Socket(sockaddr) => write!(f, "{sockaddr}"),
            LocalAddr::Mem => write!(f, "mem"),
        }
    }
}