1use std::{
4 error,
5 fmt::{self, Debug},
6 result,
7};
8
9use futures_lite::{Future, Stream, StreamExt};
10use futures_util::{FutureExt, SinkExt, TryFutureExt};
11use serde::{Deserialize, Serialize};
12
13use crate::{
14 client::{BoxStreamSync, DeferDrop},
15 message::{InteractionPattern, Msg},
16 server::{race2, RpcChannel, RpcServerError},
17 transport::{self, ConnectionErrors, StreamTypes},
18 Connector, RpcClient, Service,
19};
20
21#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
26pub struct StreamCreated;
27
28#[derive(Debug, Clone, Copy)]
30pub struct TryServerStreaming;
31
32impl InteractionPattern for TryServerStreaming {}
33
34pub trait TryServerStreamingMsg<S: Service>: Msg<S, Pattern = TryServerStreaming>
36where
37 result::Result<Self::Item, Self::ItemError>: Into<S::Res> + TryFrom<S::Res>,
38 result::Result<StreamCreated, Self::CreateError>: Into<S::Res> + TryFrom<S::Res>,
39{
40 type CreateError: Debug + Send + 'static;
42
43 type ItemError: Debug + Send + 'static;
45
46 type Item: Send + 'static;
48}
49
50#[derive(Debug)]
56pub enum Error<C: transport::Connector, E: Debug> {
57 Open(C::OpenError),
59 Send(C::SendError),
61 Recv(C::RecvError),
63 EarlyClose,
65 Downcast,
67 Application(E),
69}
70
71impl<S: transport::Connector, E: Debug> fmt::Display for Error<S, E> {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 fmt::Debug::fmt(self, f)
74 }
75}
76
77impl<S: transport::Connector, E: Debug> error::Error for Error<S, E> {}
78
79#[derive(Debug)]
83pub enum ItemError<S: ConnectionErrors, E: Debug> {
84 Recv(S::RecvError),
86 Downcast,
88 Application(E),
90}
91
92impl<S: ConnectionErrors, E: Debug> fmt::Display for ItemError<S, E> {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 fmt::Debug::fmt(self, f)
95 }
96}
97
98impl<S: ConnectionErrors, E: Debug> error::Error for ItemError<S, E> {}
99
100impl<S, C> RpcChannel<S, C>
101where
102 C: StreamTypes<In = S::Req, Out = S::Res>,
103 S: Service,
104{
105 pub async fn try_server_streaming<M, F, Fut, Str, T>(
112 self,
113 req: M,
114 target: T,
115 f: F,
116 ) -> result::Result<(), RpcServerError<C>>
117 where
118 M: TryServerStreamingMsg<S>,
119 std::result::Result<M::Item, M::ItemError>: Into<S::Res> + TryFrom<S::Res>,
120 std::result::Result<StreamCreated, M::CreateError>: Into<S::Res> + TryFrom<S::Res>,
121 F: FnOnce(T, M) -> Fut + Send + 'static,
122 Fut: Future<Output = std::result::Result<Str, M::CreateError>> + Send + 'static,
123 Str: Stream<Item = std::result::Result<M::Item, M::ItemError>> + Send + 'static,
124 T: Send + 'static,
125 {
126 let Self {
127 mut send, mut recv, ..
128 } = self;
129 let cancel = recv
131 .next()
132 .map(|_| RpcServerError::UnexpectedUpdateMessage::<C>);
133 race2(cancel.map(Err), async move {
135 let responses = match f(target, req).await {
137 Ok(responses) => {
138 let response = Ok(StreamCreated).into();
140 send.send(response)
142 .await
143 .map_err(RpcServerError::SendError)?;
144 responses
145 }
146 Err(cause) => {
147 let response = Err(cause).into();
149 send.send(response)
151 .await
152 .map_err(RpcServerError::SendError)?;
153 return Ok(());
154 }
155 };
156 tokio::pin!(responses);
157 while let Some(response) = responses.next().await {
158 let response = response.into();
160 send.send(response)
162 .await
163 .map_err(RpcServerError::SendError)?;
164 }
165 Ok(())
166 })
167 .await
168 }
169}
170
171impl<S, C> RpcClient<S, C>
172where
173 C: Connector<S>,
174 S: Service,
175{
176 pub async fn try_server_streaming<M>(
178 &self,
179 msg: M,
180 ) -> result::Result<
181 BoxStreamSync<'static, Result<M::Item, ItemError<C, M::ItemError>>>,
182 Error<C, M::CreateError>,
183 >
184 where
185 M: TryServerStreamingMsg<S>,
186 Result<M::Item, M::ItemError>: Into<S::Res> + TryFrom<S::Res>,
187 Result<StreamCreated, M::CreateError>: Into<S::Res> + TryFrom<S::Res>,
188 {
189 let msg = msg.into();
190 let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
191 send.send(msg).map_err(Error::Send).await?;
192 let Some(initial) = recv.next().await else {
193 return Err(Error::EarlyClose);
194 };
195 let initial = initial.map_err(Error::Recv)?; let initial = <std::result::Result<StreamCreated, M::CreateError>>::try_from(initial)
197 .map_err(|_| Error::Downcast)?;
198 let _ = initial.map_err(Error::Application)?;
199 let recv = recv.map(move |x| {
200 let x = x.map_err(ItemError::Recv)?;
201 let x = <std::result::Result<M::Item, M::ItemError>>::try_from(x)
202 .map_err(|_| ItemError::Downcast)?;
203 let x = x.map_err(ItemError::Application)?;
204 Ok(x)
205 });
206 let recv = Box::pin(DeferDrop(recv, send));
208 Ok(recv)
209 }
210}