1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
//! Server side api
//!
//! The main entry point is [RpcServer]
use crate::{
map::{ChainedMapper, MapService, Mapper},
transport::ConnectionErrors,
Service, ServiceEndpoint,
};
use futures_lite::{Future, Stream, StreamExt};
use pin_project::pin_project;
use std::{
error,
fmt::{self, Debug},
marker::PhantomData,
pin::Pin,
result,
sync::Arc,
task::{self, Poll},
};
use tokio::sync::oneshot;
/// A server channel for a specific service.
///
/// This is a wrapper around a [ServiceEndpoint] that serves as the entry point for the server DSL.
/// `S` is the service type, `C` is the channel type.
#[derive(Debug)]
pub struct RpcServer<S, C> {
/// The channel on which new requests arrive.
///
/// Each new request is a receiver and channel pair on which messages for this request
/// are received and responses sent.
source: C,
p: PhantomData<S>,
}
impl<S, C: Clone> Clone for RpcServer<S, C> {
fn clone(&self) -> Self {
Self {
source: self.source.clone(),
p: PhantomData,
}
}
}
impl<S: Service, C: ServiceEndpoint<S>> RpcServer<S, C> {
/// Create a new rpc server for a specific service for a [Service] given a compatible
/// [ServiceEndpoint].
///
/// This is where a generic typed endpoint is converted into a server for a specific service.
pub fn new(source: C) -> Self {
Self {
source,
p: PhantomData,
}
}
}
/// A channel for requests and responses for a specific service.
///
/// This just groups the sink and stream into a single type, and attaches the
/// information about the service type.
///
/// Sink and stream are independent, so you can take the channel apart and use
/// them independently.
#[derive(Debug)]
pub struct RpcChannel<S: Service, C: ServiceEndpoint<S>, SInner: Service = S> {
/// Sink to send responses to the client.
pub send: C::SendSink,
/// Stream to receive requests from the client.
pub recv: C::RecvStream,
/// Mapper to map between S and S2
pub map: Arc<dyn MapService<S, SInner>>,
}
impl<S, C> RpcChannel<S, C, S>
where
S: Service,
C: ServiceEndpoint<S>,
{
/// Create a new RPC channel.
pub fn new(send: C::SendSink, recv: C::RecvStream) -> Self {
Self {
send,
recv,
map: Arc::new(Mapper::new()),
}
}
}
impl<S, C, SInner> RpcChannel<S, C, SInner>
where
S: Service,
C: ServiceEndpoint<S>,
SInner: Service,
{
/// Map this channel's service into an inner service.
///
/// This method is available if the required bounds are upheld:
/// SNext::Req: Into<SInner::Req> + TryFrom<SInner::Req>,
/// SNext::Res: Into<SInner::Res> + TryFrom<SInner::Res>,
///
/// Where SNext is the new service to map to and SInner is the current inner service.
///
/// This method can be chained infintely.
pub fn map<SNext>(self) -> RpcChannel<S, C, SNext>
where
SNext: Service,
SNext::Req: Into<SInner::Req> + TryFrom<SInner::Req>,
SNext::Res: Into<SInner::Res> + TryFrom<SInner::Res>,
{
let map = ChainedMapper::new(self.map);
RpcChannel {
send: self.send,
recv: self.recv,
map: Arc::new(map),
}
}
}
/// The result of accepting a new connection.
pub struct Accepting<S: Service, C: ServiceEndpoint<S>> {
send: C::SendSink,
recv: C::RecvStream,
}
impl<S: Service, C: ServiceEndpoint<S>> Accepting<S, C> {
/// Read the first message from the client.
///
/// The return value is a tuple of `(request, channel)`. Here `request` is the
/// first request which is already read from the stream. The `channel` is a
/// [RpcChannel] that has `sink` and `stream` fields that can be used to send more
/// requests and/or receive more responses.
///
/// Often sink and stream will wrap an an underlying byte stream. In this case you can
/// call into_inner() on them to get it back to perform byte level reads and writes.
pub async fn read_first(self) -> result::Result<(S::Req, RpcChannel<S, C>), RpcServerError<C>> {
let Accepting { send, mut recv } = self;
// get the first message from the client. This will tell us what it wants to do.
let request: S::Req = recv
.next()
.await
// no msg => early close
.ok_or(RpcServerError::EarlyClose)?
// recv error
.map_err(RpcServerError::RecvError)?;
Ok((request, RpcChannel::new(send, recv)))
}
}
impl<S: Service, C: ServiceEndpoint<S>> RpcServer<S, C> {
/// Accepts a new channel from a client. The result is an [Accepting] object that
/// can be used to read the first request.
pub async fn accept(&self) -> result::Result<Accepting<S, C>, RpcServerError<C>> {
let (send, recv) = self.source.accept().await.map_err(RpcServerError::Accept)?;
Ok(Accepting { send, recv })
}
/// Get the underlying service endpoint
pub fn into_inner(self) -> C {
self.source
}
}
impl<S: Service, C: ServiceEndpoint<S>> AsRef<C> for RpcServer<S, C> {
fn as_ref(&self) -> &C {
&self.source
}
}
/// A stream of updates
///
/// If there is any error with receiving or with decoding the updates, the stream will stall and the error will
/// cause a termination of the RPC call.
#[pin_project]
#[derive(Debug)]
pub struct UpdateStream<S, C, T, SInner = S>(
#[pin] C::RecvStream,
Option<oneshot::Sender<RpcServerError<C>>>,
PhantomData<T>,
Arc<dyn MapService<S, SInner>>,
)
where
S: Service,
SInner: Service,
C: ServiceEndpoint<S>;
impl<S, C, T, SInner> UpdateStream<S, C, T, SInner>
where
S: Service,
SInner: Service,
C: ServiceEndpoint<S>,
T: TryFrom<SInner::Req>,
{
pub(crate) fn new(
recv: C::RecvStream,
map: Arc<dyn MapService<S, SInner>>,
) -> (Self, UnwrapToPending<RpcServerError<C>>) {
let (error_send, error_recv) = oneshot::channel();
let error_recv = UnwrapToPending(error_recv);
(Self(recv, Some(error_send), PhantomData, map), error_recv)
}
}
impl<S, C, T, SInner> Stream for UpdateStream<S, C, T, SInner>
where
S: Service,
SInner: Service,
C: ServiceEndpoint<S>,
T: TryFrom<SInner::Req>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match Pin::new(&mut this.0).poll_next(cx) {
Poll::Ready(Some(msg)) => match msg {
Ok(msg) => {
let msg = this.3.req_try_into_inner(msg);
let msg = msg.and_then(|msg| T::try_from(msg).map_err(|_cause| ()));
match msg {
Ok(msg) => Poll::Ready(Some(msg)),
Err(_cause) => {
// we were unable to downcast, so we need to send an error
if let Some(tx) = this.1.take() {
let _ = tx.send(RpcServerError::UnexpectedUpdateMessage);
}
Poll::Pending
}
}
}
Err(cause) => {
// we got a recv error, so return pending and send the error
if let Some(tx) = this.1.take() {
let _ = tx.send(RpcServerError::RecvError(cause));
}
Poll::Pending
}
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
/// Server error. All server DSL methods return a `Result` with this error type.
pub enum RpcServerError<C: ConnectionErrors> {
/// Unable to open a new channel
Accept(C::OpenError),
/// Recv side for a channel was closed before getting the first message
EarlyClose,
/// Got an unexpected first message, e.g. an update message
UnexpectedStartMessage,
/// Error receiving a message
RecvError(C::RecvError),
/// Error sending a response
SendError(C::SendError),
/// Got an unexpected update message, e.g. a request message or a non-matching update message
UnexpectedUpdateMessage,
}
impl<C: ConnectionErrors> fmt::Debug for RpcServerError<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Accept(arg0) => f.debug_tuple("Open").field(arg0).finish(),
Self::EarlyClose => write!(f, "EarlyClose"),
Self::RecvError(arg0) => f.debug_tuple("RecvError").field(arg0).finish(),
Self::SendError(arg0) => f.debug_tuple("SendError").field(arg0).finish(),
Self::UnexpectedStartMessage => f.debug_tuple("UnexpectedStartMessage").finish(),
Self::UnexpectedUpdateMessage => f.debug_tuple("UnexpectedStartMessage").finish(),
}
}
}
impl<C: ConnectionErrors> fmt::Display for RpcServerError<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fmt::Debug::fmt(&self, f)
}
}
impl<C: ConnectionErrors> error::Error for RpcServerError<C> {}
/// Take an oneshot receiver and just return Pending the underlying future returns `Err(oneshot::Canceled)`
pub(crate) struct UnwrapToPending<T>(oneshot::Receiver<T>);
impl<T> Future for UnwrapToPending<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(Ok(x)) => Poll::Ready(x),
Poll::Ready(Err(_)) => Poll::Pending,
Poll::Pending => Poll::Pending,
}
}
}
pub(crate) async fn race2<T, A: Future<Output = T>, B: Future<Output = T>>(f1: A, f2: B) -> T {
tokio::select! {
x = f1 => x,
x = f2 => x,
}
}
/// Run a server loop, invoking a handler callback for each request.
///
/// Requests will be handled sequentially.
pub async fn run_server_loop<S, C, T, F, Fut>(
_service_type: S,
conn: C,
target: T,
mut handler: F,
) -> Result<(), RpcServerError<C>>
where
S: Service,
C: ServiceEndpoint<S>,
T: Clone + Send + 'static,
F: FnMut(RpcChannel<S, C>, S::Req, T) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), RpcServerError<C>>> + Send + 'static,
{
let server = RpcServer::<S, C>::new(conn);
loop {
let (req, chan) = server.accept().await?.read_first().await?;
let target = target.clone();
handler(chan, req, target).await?;
}
}