quic_rpc/pattern/
client_streaming.rs1use std::{
4 error,
5 fmt::{self, Debug},
6 result,
7};
8
9use futures_lite::{future::Boxed, Future, StreamExt};
10use futures_util::{FutureExt, SinkExt, TryFutureExt};
11
12use crate::{
13 client::UpdateSink,
14 message::{InteractionPattern, Msg},
15 server::{race2, RpcChannel, RpcServerError, UpdateStream},
16 transport::{ConnectionErrors, StreamTypes},
17 Connector, RpcClient, Service,
18};
19
20#[derive(Debug, Clone, Copy)]
25pub struct ClientStreaming;
26impl InteractionPattern for ClientStreaming {}
27
28pub trait ClientStreamingMsg<S: Service>: Msg<S, Pattern = ClientStreaming> {
30 type Update: Into<S::Req> + TryFrom<S::Req> + Send + 'static;
35
36 type Response: Into<S::Res> + TryFrom<S::Res> + Send + 'static;
40}
41
42#[derive(Debug)]
44pub enum Error<C: ConnectionErrors> {
45 Open(C::OpenError),
47 Send(C::SendError),
49}
50
51impl<C: ConnectionErrors> fmt::Display for Error<C> {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 fmt::Debug::fmt(self, f)
54 }
55}
56
57impl<C: ConnectionErrors> error::Error for Error<C> {}
58
59#[derive(Debug)]
61pub enum ItemError<C: ConnectionErrors> {
62 EarlyClose,
64 RecvError(C::RecvError),
66 DowncastError,
68}
69
70impl<C: ConnectionErrors> fmt::Display for ItemError<C> {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 fmt::Debug::fmt(self, f)
73 }
74}
75
76impl<C: ConnectionErrors> error::Error for ItemError<C> {}
77
78impl<S, C> RpcClient<S, C>
79where
80 S: Service,
81 C: Connector<S>,
82{
83 pub async fn client_streaming<M>(
85 &self,
86 msg: M,
87 ) -> result::Result<
88 (
89 UpdateSink<C, M::Update>,
90 Boxed<result::Result<M::Response, ItemError<C>>>,
91 ),
92 Error<C>,
93 >
94 where
95 M: ClientStreamingMsg<S>,
96 {
97 let msg = msg.into();
98 let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
99 send.send(msg).map_err(Error::Send).await?;
100 let send = UpdateSink::<C, M::Update>::new(send);
101 let recv = async move {
102 let item = recv.next().await.ok_or(ItemError::EarlyClose)?;
103
104 match item {
105 Ok(msg) => M::Response::try_from(msg).map_err(|_| ItemError::DowncastError),
106 Err(e) => Err(ItemError::RecvError(e)),
107 }
108 }
109 .boxed();
110 Ok((send, recv))
111 }
112}
113
114impl<S, C> RpcChannel<S, C>
115where
116 S: Service,
117 C: StreamTypes<In = S::Req, Out = S::Res>,
118{
119 pub async fn client_streaming<M, F, Fut, T>(
123 self,
124 req: M,
125 target: T,
126 f: F,
127 ) -> result::Result<(), RpcServerError<C>>
128 where
129 M: ClientStreamingMsg<S>,
130 F: FnOnce(T, M, UpdateStream<C, M::Update>) -> Fut + Send + 'static,
131 Fut: Future<Output = M::Response> + Send + 'static,
132 T: Send + 'static,
133 {
134 let Self { mut send, recv, .. } = self;
135 let (updates, read_error) = UpdateStream::new(recv);
136 race2(read_error.map(Err), async move {
137 let res = f(target, req, updates).await;
139 let res = res.into();
141 send.send(res).await.map_err(RpcServerError::SendError)
143 })
144 .await
145 }
146}