use futures_lite::{Future, Stream, StreamExt};
use futures_util::{FutureExt, SinkExt, TryFutureExt};
use serde::{Deserialize, Serialize};
use crate::{
client::{BoxStreamSync, DeferDrop},
message::{InteractionPattern, Msg},
server::{race2, RpcChannel, RpcServerError},
transport::ConnectionErrors,
RpcClient, Service, ServiceConnection, ServiceEndpoint,
};
use std::{
error,
fmt::{self, Debug},
result,
sync::Arc,
};
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StreamCreated;
#[derive(Debug, Clone, Copy)]
pub struct TryServerStreaming;
impl InteractionPattern for TryServerStreaming {}
pub trait TryServerStreamingMsg<S: Service>: Msg<S, Pattern = TryServerStreaming>
where
result::Result<Self::Item, Self::ItemError>: Into<S::Res> + TryFrom<S::Res>,
result::Result<StreamCreated, Self::CreateError>: Into<S::Res> + TryFrom<S::Res>,
{
type CreateError: Debug + Send + 'static;
type ItemError: Debug + Send + 'static;
type Item: Send + 'static;
}
#[derive(Debug)]
pub enum Error<C: ConnectionErrors, E: Debug> {
Open(C::OpenError),
Send(C::SendError),
Recv(C::RecvError),
EarlyClose,
Downcast,
Application(E),
}
impl<S: ConnectionErrors, E: Debug> fmt::Display for Error<S, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
impl<S: ConnectionErrors, E: Debug> error::Error for Error<S, E> {}
#[derive(Debug)]
pub enum ItemError<S: ConnectionErrors, E: Debug> {
Recv(S::RecvError),
Downcast,
Application(E),
}
impl<S: ConnectionErrors, E: Debug> fmt::Display for ItemError<S, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
impl<S: ConnectionErrors, E: Debug> error::Error for ItemError<S, E> {}
impl<SC, C, S> RpcChannel<S, C, SC>
where
SC: Service,
C: ServiceEndpoint<SC>,
S: Service,
{
pub async fn try_server_streaming<M, F, Fut, Str, T>(
self,
req: M,
target: T,
f: F,
) -> result::Result<(), RpcServerError<C>>
where
M: TryServerStreamingMsg<S>,
std::result::Result<M::Item, M::ItemError>: Into<S::Res> + TryFrom<S::Res>,
std::result::Result<StreamCreated, M::CreateError>: Into<S::Res> + TryFrom<S::Res>,
F: FnOnce(T, M) -> Fut + Send + 'static,
Fut: Future<Output = std::result::Result<Str, M::CreateError>> + Send + 'static,
Str: Stream<Item = std::result::Result<M::Item, M::ItemError>> + Send + 'static,
T: Send + 'static,
{
let Self {
mut send, mut recv, ..
} = self;
let cancel = recv
.next()
.map(|_| RpcServerError::UnexpectedUpdateMessage::<C>);
race2(cancel.map(Err), async move {
let responses = match f(target, req).await {
Ok(responses) => {
let response = self.map.res_into_outer(Ok(StreamCreated).into());
send.send(response)
.await
.map_err(RpcServerError::SendError)?;
responses
}
Err(cause) => {
let response = self.map.res_into_outer(Err(cause).into());
send.send(response)
.await
.map_err(RpcServerError::SendError)?;
return Ok(());
}
};
tokio::pin!(responses);
while let Some(response) = responses.next().await {
let response = self.map.res_into_outer(response.into());
send.send(response)
.await
.map_err(RpcServerError::SendError)?;
}
Ok(())
})
.await
}
}
impl<S, C, SC> RpcClient<S, C, SC>
where
SC: Service,
C: ServiceConnection<SC>,
S: Service,
{
pub async fn try_server_streaming<M>(
&self,
msg: M,
) -> result::Result<
BoxStreamSync<'static, Result<M::Item, ItemError<C, M::ItemError>>>,
Error<C, M::CreateError>,
>
where
M: TryServerStreamingMsg<S>,
Result<M::Item, M::ItemError>: Into<S::Res> + TryFrom<S::Res>,
Result<StreamCreated, M::CreateError>: Into<S::Res> + TryFrom<S::Res>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
send.send(msg).map_err(Error::Send).await?;
let map = Arc::clone(&self.map);
let Some(initial) = recv.next().await else {
return Err(Error::EarlyClose);
};
let initial = initial.map_err(Error::Recv)?; let initial = map
.res_try_into_inner(initial)
.map_err(|_| Error::Downcast)?;
let initial = <std::result::Result<StreamCreated, M::CreateError>>::try_from(initial)
.map_err(|_| Error::Downcast)?;
let _ = initial.map_err(Error::Application)?;
let recv = recv.map(move |x| {
let x = x.map_err(ItemError::Recv)?;
let x = map.res_try_into_inner(x).map_err(|_| ItemError::Downcast)?;
let x = <std::result::Result<M::Item, M::ItemError>>::try_from(x)
.map_err(|_| ItemError::Downcast)?;
let x = x.map_err(ItemError::Application)?;
Ok(x)
});
let recv = Box::pin(DeferDrop(recv, send));
Ok(recv)
}
}