quic_rpc/pattern/
bidi_streaming.rsuse std::{
error,
fmt::{self, Debug},
result,
};
use futures_lite::{Stream, StreamExt};
use futures_util::{FutureExt, SinkExt};
use crate::{
client::{BoxStreamSync, UpdateSink},
message::{InteractionPattern, Msg},
server::{race2, RpcChannel, RpcServerError, UpdateStream},
transport::{ConnectionErrors, Connector, StreamTypes},
RpcClient, Service,
};
#[derive(Debug, Clone, Copy)]
pub struct BidiStreaming;
impl InteractionPattern for BidiStreaming {}
pub trait BidiStreamingMsg<S: Service>: Msg<S, Pattern = BidiStreaming> {
type Update: Into<S::Req> + TryFrom<S::Req> + Send + 'static;
type Response: Into<S::Res> + TryFrom<S::Res> + Send + 'static;
}
#[derive(Debug)]
pub enum Error<C: ConnectionErrors> {
Open(C::OpenError),
Send(C::SendError),
}
impl<C: ConnectionErrors> fmt::Display for Error<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
impl<C: ConnectionErrors> error::Error for Error<C> {}
#[derive(Debug)]
pub enum ItemError<C: ConnectionErrors> {
RecvError(C::RecvError),
DowncastError,
}
impl<C: ConnectionErrors> fmt::Display for ItemError<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
impl<C: ConnectionErrors> error::Error for ItemError<C> {}
impl<S, C> RpcClient<S, C>
where
S: Service,
C: Connector<In = S::Res, Out = S::Req>,
{
pub async fn bidi<M>(
&self,
msg: M,
) -> result::Result<
(
UpdateSink<C, M::Update>,
BoxStreamSync<'static, result::Result<M::Response, ItemError<C>>>,
),
Error<C>,
>
where
M: BidiStreamingMsg<S>,
{
let msg = msg.into();
let (mut send, recv) = self.source.open().await.map_err(Error::Open)?;
send.send(msg).await.map_err(Error::<C>::Send)?;
let send = UpdateSink::new(send);
let recv = Box::pin(recv.map(move |x| match x {
Ok(msg) => M::Response::try_from(msg).map_err(|_| ItemError::DowncastError),
Err(e) => Err(ItemError::RecvError(e)),
}));
Ok((send, recv))
}
}
impl<C, S> RpcChannel<S, C>
where
C: StreamTypes<In = S::Req, Out = S::Res>,
S: Service,
{
pub async fn bidi_streaming<M, F, Str, T>(
self,
req: M,
target: T,
f: F,
) -> result::Result<(), RpcServerError<C>>
where
M: BidiStreamingMsg<S>,
F: FnOnce(T, M, UpdateStream<C, M::Update>) -> Str + Send + 'static,
Str: Stream<Item = M::Response> + Send + 'static,
T: Send + 'static,
{
let Self { mut send, recv, .. } = self;
let (updates, read_error) = UpdateStream::new(recv);
let responses = f(target, req, updates);
race2(read_error.map(Err), async move {
tokio::pin!(responses);
while let Some(response) = responses.next().await {
let response = response.into();
send.send(response)
.await
.map_err(RpcServerError::SendError)?;
}
Ok(())
})
.await
}
}