1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
//! A streaming rpc system based on quic
//!
//! # Motivation
//!
//! See the [README](https://github.com/n0-computer/quic-rpc/blob/main/README.md)
//!
//! # Example
//! ```
//! # async fn example() -> anyhow::Result<()> {
//! use quic_rpc::{message::RpcMsg, Service, RpcClient, transport::MemChannelTypes};
//! use serde::{Serialize, Deserialize};
//! use derive_more::{From, TryInto};
//!
//! // Define your messages
//! #[derive(Debug, Serialize, Deserialize)]
//! struct Ping;
//!
//! #[derive(Debug, Serialize, Deserialize)]
//! struct Pong;
//!
//! // Define your RPC service and its request/response types
//!
//! #[derive(Debug, Serialize, Deserialize, From, TryInto)]
//! enum PingRequest {
//!     Ping(Ping),
//! }
//!
//! #[derive(Debug, Serialize, Deserialize, From, TryInto)]
//! enum PingResponse {
//!     Pong(Pong),
//! }
//!
//! #[derive(Debug, Clone)]
//! struct PingService;
//!
//! impl Service for PingService {
//!   type Req = PingRequest;
//!   type Res = PingResponse;
//! }
//!
//! // Define interaction patterns for each request type
//! impl RpcMsg<PingService> for Ping {
//!   type Response = Pong;
//! }
//!
//! // create a transport channel
//! let (server, client) = quic_rpc::transport::mem::connection::<PingRequest, PingResponse>(1);
//!
//! // create the rpc client given the channel and the service type
//! let mut client = RpcClient::<PingService, MemChannelTypes>::new(client);
//!
//! // call the service
//! let res = client.rpc(Ping).await?;
//! # Ok(())
//! # }
//! ```
#![deny(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
use futures::{Future, Sink, Stream};
use serde::{de::DeserializeOwned, Serialize};
use std::{
    fmt::{self, Debug, Display},
    net::SocketAddr,
    result,
};
pub mod client;
pub mod message;
pub mod server;
pub mod transport;
pub use client::RpcClient;
pub use server::RpcServer;
mod macros;

/// Requirements for a RPC message
///
/// Even when just using the mem transport, we require messages to be Serializable and Deserializable.
/// Likewise, even when using the quinn transport, we require messages to be Send.
///
/// This does not seem like a big restriction. If you want a pure memory channel without the possibility
/// to also use the quinn transport, you might want to use a mpsc channel directly.
pub trait RpcMessage: Debug + Serialize + DeserializeOwned + Send + Sync + Unpin + 'static {}

impl<T> RpcMessage for T where
    T: Debug + Serialize + DeserializeOwned + Send + Sync + Unpin + 'static
{
}

/// Requirements for an internal error
///
/// All errors have to be Send and 'static so they can be sent across threads.
pub trait RpcError: Debug + Display + Send + Sync + Unpin + 'static {}

impl<T> RpcError for T where T: Debug + Display + Send + Sync + Unpin + 'static {}

/// A service
pub trait Service: Send + Sync + Debug + Clone + 'static {
    /// Type of request messages
    type Req: RpcMessage;
    /// Type of response messages
    type Res: RpcMessage;
}

/// Defines a set of types for a kind of channel
///
/// Every distinct kind of channel has its own ChannelType. See e.g.
/// [crate::transport::MemChannelTypes].
pub trait ChannelTypes: Debug + Sized + Send + Sync + Unpin + Clone + 'static {
    /// The sink used for sending either requests or responses on this channel
    type SendSink<M: RpcMessage>: Sink<M, Error = Self::SendError> + Send + Unpin + 'static;
    /// The stream used for receiving either requests or responses on this channel
    type RecvStream<M: RpcMessage>: Stream<Item = result::Result<M, Self::RecvError>>
        + Send
        + Unpin
        + 'static;
    /// Error you might get while sending messages to a sink
    type SendError: RpcError;
    /// Error you might get while receiving messages from a stream
    type RecvError: RpcError;
    /// Error you might get when opening a new connection to the server
    type OpenBiError: RpcError;
    /// Future returned by open_bi
    type OpenBiFuture<'a, In: RpcMessage, Out: RpcMessage>: Future<
            Output = result::Result<(Self::SendSink<Out>, Self::RecvStream<In>), Self::OpenBiError>,
        > + Send
        + 'a
    where
        Self: 'a;

    /// Error you might get when waiting for new streams on the server side
    type AcceptBiError: RpcError;
    /// Future returned by accept_bi
    type AcceptBiFuture<'a, In: RpcMessage, Out: RpcMessage>: Future<
            Output = result::Result<
                (Self::SendSink<Out>, Self::RecvStream<In>),
                Self::AcceptBiError,
            >,
        > + Send
        + 'a
    where
        Self: 'a;

    /// Channel type
    type ClientChannel<In: RpcMessage, Out: RpcMessage>: ClientChannel<In, Out, Self>;

    /// Channel type
    type ServerChannel<In: RpcMessage, Out: RpcMessage>: ServerChannel<In, Out, Self>;
}

/// An abstract client channel with typed input and output
pub trait ClientChannel<In: RpcMessage, Out: RpcMessage, T: ChannelTypes>:
    Debug + Clone + Send + Sync + 'static
{
    /// Open a bidirectional stream
    fn open_bi(&self) -> T::OpenBiFuture<'_, In, Out>;
}

/// An abstract server with typed input and output
pub trait ServerChannel<In: RpcMessage, Out: RpcMessage, T: ChannelTypes>:
    Debug + Clone + Send + Sync + 'static
{
    /// Accept a bidirectional stream
    fn accept_bi(&self) -> T::AcceptBiFuture<'_, In, Out>;

    /// The local addresses this server is bound to.
    ///
    /// This is useful for publicly facing addresses when you start the server with a random
    /// port, `:0` and let the kernel choose the real bind address.  This will return the
    /// address with the actual port used.
    fn local_addr(&self) -> &[LocalAddr];
}

/// The kinds of local addresses a [`ServerChannel`] can be bound to.
///
/// Returned by [`ServerChannel::local_addr`].
///
/// [`Display`]: fmt::Display
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum LocalAddr {
    /// A local socket.
    Socket(SocketAddr),
    /// An in-memory address.
    Mem,
}

impl Display for LocalAddr {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            LocalAddr::Socket(sockaddr) => write!(f, "{sockaddr}"),
            LocalAddr::Mem => write!(f, "mem"),
        }
    }
}