quic_rpc/transport/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
//! Built in transports for quic-rpc
//!
//! There are two sides to a transport, a server side where connections are
//! accepted and a client side where connections are initiated.
//!
//! Connections are bidirectional typed channels, with a distinct type for
//! the send and receive side. They are *unrelated* to services.
//!
//! In the transport module, the message types are referred to as `In` and `Out`.
//!
//! A [`Connector`] can be used to *open* bidirectional typed channels using
//! [`Connector::open`]. A [`Listener`] can be used to *accept* bidirectional
//! typed channels from any of the currently opened connections to clients, using
//! [`Listener::accept`].
//!
//! In both cases, the result is a tuple of a send side and a receive side. These
//! types are defined by implementing the [`StreamTypes`] trait.
//!
//! Errors for both sides are defined by implementing the [`ConnectionErrors`] trait.
use std::{
fmt::{self, Debug, Display},
net::SocketAddr,
};
use boxed::{BoxableConnector, BoxableListener, BoxedConnector, BoxedListener};
use futures_lite::{Future, Stream};
use futures_sink::Sink;
use mapped::MappedConnector;
use crate::{RpcError, RpcMessage};
pub mod boxed;
pub mod combined;
#[cfg(feature = "flume-transport")]
pub mod flume;
#[cfg(feature = "hyper-transport")]
pub mod hyper;
#[cfg(feature = "iroh-net-transport")]
pub mod iroh_net;
pub mod mapped;
pub mod misc;
#[cfg(feature = "quinn-transport")]
pub mod quinn;
#[cfg(any(
feature = "quinn-transport",
feature = "hyper-transport",
feature = "iroh-net-transport"
))]
mod util;
/// Errors that can happen when creating and using a [`Connector`] or [`Listener`].
pub trait ConnectionErrors: Debug + Clone + Send + Sync + 'static {
/// Error when sending a message via a channel
type SendError: RpcError;
/// Error when receiving a message via a channel
type RecvError: RpcError;
/// Error when opening a channel
type OpenError: RpcError;
/// Error when accepting a channel
type AcceptError: RpcError;
}
/// Types that are common to both [`Connector`] and [`Listener`].
///
/// Having this as a separate trait is useful when writing generic code that works with both.
pub trait StreamTypes: ConnectionErrors {
/// The type of messages that can be received on the channel
type In: RpcMessage;
/// The type of messages that can be sent on the channel
type Out: RpcMessage;
/// Receive side of a bidirectional typed channel
type RecvStream: Stream<Item = Result<Self::In, Self::RecvError>>
+ Send
+ Sync
+ Unpin
+ 'static;
/// Send side of a bidirectional typed channel
type SendSink: Sink<Self::Out, Error = Self::SendError> + Send + Sync + Unpin + 'static;
}
/// A connection to a specific remote machine
///
/// A connection can be used to open bidirectional typed channels using [`Connector::open`].
pub trait Connector: StreamTypes {
/// Open a channel to the remote che
fn open(
&self,
) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::OpenError>> + Send;
/// Map the input and output types of this connection
fn map<In1, Out1>(self) -> MappedConnector<In1, Out1, Self>
where
In1: TryFrom<Self::In>,
Self::Out: From<Out1>,
{
MappedConnector::new(self)
}
/// Box the connection
fn boxed(self) -> BoxedConnector<Self::In, Self::Out>
where
Self: BoxableConnector<Self::In, Self::Out> + Sized + 'static,
{
self::BoxedConnector::new(self)
}
}
/// A listener that listens for connections
///
/// A listener can be used to accept bidirectional typed channels from any of the
/// currently opened connections to clients, using [`Listener::accept`].
pub trait Listener: StreamTypes {
/// Accept a new typed bidirectional channel on any of the connections we
/// have currently opened.
fn accept(
&self,
) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::AcceptError>> + Send;
/// The local addresses this endpoint is bound to.
fn local_addr(&self) -> &[LocalAddr];
/// Box the listener
fn boxed(self) -> BoxedListener<Self::In, Self::Out>
where
Self: BoxableListener<Self::In, Self::Out> + Sized + 'static,
{
BoxedListener::new(self)
}
}
/// The kinds of local addresses a [Listener] can be bound to.
///
/// Returned by [Listener::local_addr].
///
/// [`Display`]: fmt::Display
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum LocalAddr {
/// A local socket.
Socket(SocketAddr),
/// An in-memory address.
Mem,
}
impl Display for LocalAddr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
LocalAddr::Socket(sockaddr) => write!(f, "{sockaddr}"),
LocalAddr::Mem => write!(f, "mem"),
}
}
}