quic_rpc/pattern/
try_server_streaming.rs

1//! Fallible server streaming interaction pattern.
2
3use std::{
4    error,
5    fmt::{self, Debug},
6    result,
7};
8
9use futures_lite::{Future, Stream, StreamExt};
10use futures_util::{FutureExt, SinkExt, TryFutureExt};
11use serde::{Deserialize, Serialize};
12
13use crate::{
14    client::{BoxStreamSync, DeferDrop},
15    message::{InteractionPattern, Msg},
16    server::{race2, RpcChannel, RpcServerError},
17    transport::{self, ConnectionErrors, StreamTypes},
18    Connector, RpcClient, Service,
19};
20
21/// A guard message to indicate that the stream has been created.
22///
23/// This is so we can dinstinguish between an error creating the stream and
24/// an error in the first item produced by the stream.
25#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
26pub struct StreamCreated;
27
28/// Fallible server streaming interaction pattern.
29#[derive(Debug, Clone, Copy)]
30pub struct TryServerStreaming;
31
32impl InteractionPattern for TryServerStreaming {}
33
34/// Same as ServerStreamingMsg, but with lazy stream creation and the error type explicitly defined.
35pub trait TryServerStreamingMsg<S: Service>: Msg<S, Pattern = TryServerStreaming>
36where
37    result::Result<Self::Item, Self::ItemError>: Into<S::Res> + TryFrom<S::Res>,
38    result::Result<StreamCreated, Self::CreateError>: Into<S::Res> + TryFrom<S::Res>,
39{
40    /// Error when creating the stream
41    type CreateError: Debug + Send + 'static;
42
43    /// Error for stream items
44    type ItemError: Debug + Send + 'static;
45
46    /// Successful response item
47    type Item: Send + 'static;
48}
49
50/// Server error when accepting a server streaming request
51///
52/// This combines network errors with application errors. Usually you don't
53/// care about the exact nature of the error, but if you want to handle
54/// application errors differently, you can match on this enum.
55#[derive(Debug)]
56pub enum Error<C: transport::Connector, E: Debug> {
57    /// Unable to open a substream at all
58    Open(C::OpenError),
59    /// Unable to send the request to the server
60    Send(C::SendError),
61    /// Error received when creating the stream
62    Recv(C::RecvError),
63    /// Connection was closed before receiving the first message
64    EarlyClose,
65    /// Unexpected response from the server
66    Downcast,
67    /// Application error
68    Application(E),
69}
70
71impl<S: transport::Connector, E: Debug> fmt::Display for Error<S, E> {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        fmt::Debug::fmt(self, f)
74    }
75}
76
77impl<S: transport::Connector, E: Debug> error::Error for Error<S, E> {}
78
79/// Client error when handling responses from a server streaming request.
80///
81/// This combines network errors with application errors.
82#[derive(Debug)]
83pub enum ItemError<S: ConnectionErrors, E: Debug> {
84    /// Unable to receive the response from the server
85    Recv(S::RecvError),
86    /// Unexpected response from the server
87    Downcast,
88    /// Application error
89    Application(E),
90}
91
92impl<S: ConnectionErrors, E: Debug> fmt::Display for ItemError<S, E> {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        fmt::Debug::fmt(self, f)
95    }
96}
97
98impl<S: ConnectionErrors, E: Debug> error::Error for ItemError<S, E> {}
99
100impl<S, C> RpcChannel<S, C>
101where
102    C: StreamTypes<In = S::Req, Out = S::Res>,
103    S: Service,
104{
105    /// handle the message M using the given function on the target object
106    ///
107    /// If you want to support concurrent requests, you need to spawn this on a tokio task yourself.
108    ///
109    /// Compared to [RpcChannel::server_streaming], with this method the stream creation is via
110    /// a function that returns a future that resolves to a stream.
111    pub async fn try_server_streaming<M, F, Fut, Str, T>(
112        self,
113        req: M,
114        target: T,
115        f: F,
116    ) -> result::Result<(), RpcServerError<C>>
117    where
118        M: TryServerStreamingMsg<S>,
119        std::result::Result<M::Item, M::ItemError>: Into<S::Res> + TryFrom<S::Res>,
120        std::result::Result<StreamCreated, M::CreateError>: Into<S::Res> + TryFrom<S::Res>,
121        F: FnOnce(T, M) -> Fut + Send + 'static,
122        Fut: Future<Output = std::result::Result<Str, M::CreateError>> + Send + 'static,
123        Str: Stream<Item = std::result::Result<M::Item, M::ItemError>> + Send + 'static,
124        T: Send + 'static,
125    {
126        let Self {
127            mut send, mut recv, ..
128        } = self;
129        // cancel if we get an update, no matter what it is
130        let cancel = recv
131            .next()
132            .map(|_| RpcServerError::UnexpectedUpdateMessage::<C>);
133        // race the computation and the cancellation
134        race2(cancel.map(Err), async move {
135            // get the response
136            let responses = match f(target, req).await {
137                Ok(responses) => {
138                    // turn into a S::Res so we can send it
139                    let response = Ok(StreamCreated).into();
140                    // send it and return the error if any
141                    send.send(response)
142                        .await
143                        .map_err(RpcServerError::SendError)?;
144                    responses
145                }
146                Err(cause) => {
147                    // turn into a S::Res so we can send it
148                    let response = Err(cause).into();
149                    // send it and return the error if any
150                    send.send(response)
151                        .await
152                        .map_err(RpcServerError::SendError)?;
153                    return Ok(());
154                }
155            };
156            tokio::pin!(responses);
157            while let Some(response) = responses.next().await {
158                // turn into a S::Res so we can send it
159                let response = response.into();
160                // send it and return the error if any
161                send.send(response)
162                    .await
163                    .map_err(RpcServerError::SendError)?;
164            }
165            Ok(())
166        })
167        .await
168    }
169}
170
171impl<S, C> RpcClient<S, C>
172where
173    C: Connector<S>,
174    S: Service,
175{
176    /// Bidi call to the server, request opens a stream, response is a stream
177    pub async fn try_server_streaming<M>(
178        &self,
179        msg: M,
180    ) -> result::Result<
181        BoxStreamSync<'static, Result<M::Item, ItemError<C, M::ItemError>>>,
182        Error<C, M::CreateError>,
183    >
184    where
185        M: TryServerStreamingMsg<S>,
186        Result<M::Item, M::ItemError>: Into<S::Res> + TryFrom<S::Res>,
187        Result<StreamCreated, M::CreateError>: Into<S::Res> + TryFrom<S::Res>,
188    {
189        let msg = msg.into();
190        let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
191        send.send(msg).map_err(Error::Send).await?;
192        let Some(initial) = recv.next().await else {
193            return Err(Error::EarlyClose);
194        };
195        let initial = initial.map_err(Error::Recv)?; // initial response
196        let initial = <std::result::Result<StreamCreated, M::CreateError>>::try_from(initial)
197            .map_err(|_| Error::Downcast)?;
198        let _ = initial.map_err(Error::Application)?;
199        let recv = recv.map(move |x| {
200            let x = x.map_err(ItemError::Recv)?;
201            let x = <std::result::Result<M::Item, M::ItemError>>::try_from(x)
202                .map_err(|_| ItemError::Downcast)?;
203            let x = x.map_err(ItemError::Application)?;
204            Ok(x)
205        });
206        // keep send alive so the request on the server side does not get cancelled
207        let recv = Box::pin(DeferDrop(recv, send));
208        Ok(recv)
209    }
210}