quic_rpc/pattern/
client_streaming.rs

1//! Client streaming interaction pattern.
2
3use std::{
4    error,
5    fmt::{self, Debug},
6    result,
7};
8
9use futures_lite::{future::Boxed, Future, StreamExt};
10use futures_util::{FutureExt, SinkExt, TryFutureExt};
11
12use crate::{
13    client::UpdateSink,
14    message::{InteractionPattern, Msg},
15    server::{race2, RpcChannel, RpcServerError, UpdateStream},
16    transport::{ConnectionErrors, StreamTypes},
17    Connector, RpcClient, Service,
18};
19
20/// Client streaming interaction pattern
21///
22/// After the initial request, the client can send updates, but there is only
23/// one response.
24#[derive(Debug, Clone, Copy)]
25pub struct ClientStreaming;
26impl InteractionPattern for ClientStreaming {}
27
28/// Defines update type and response type for a client streaming message.
29pub trait ClientStreamingMsg<S: Service>: Msg<S, Pattern = ClientStreaming> {
30    /// The type for request updates
31    ///
32    /// For a request that does not support updates, this can be safely set to any type, including
33    /// the message type itself. Any update for such a request will result in an error.
34    type Update: Into<S::Req> + TryFrom<S::Req> + Send + 'static;
35
36    /// The type for the response
37    ///
38    /// For requests that can produce errors, this can be set to [Result<T, E>](std::result::Result).
39    type Response: Into<S::Res> + TryFrom<S::Res> + Send + 'static;
40}
41
42/// Server error when accepting a client streaming request
43#[derive(Debug)]
44pub enum Error<C: ConnectionErrors> {
45    /// Unable to open a substream at all
46    Open(C::OpenError),
47    /// Unable to send the request to the server
48    Send(C::SendError),
49}
50
51impl<C: ConnectionErrors> fmt::Display for Error<C> {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        fmt::Debug::fmt(self, f)
54    }
55}
56
57impl<C: ConnectionErrors> error::Error for Error<C> {}
58
59/// Server error when receiving an item for a client streaming request
60#[derive(Debug)]
61pub enum ItemError<C: ConnectionErrors> {
62    /// Connection was closed before receiving the first message
63    EarlyClose,
64    /// Unable to receive the response from the server
65    RecvError(C::RecvError),
66    /// Unexpected response from the server
67    DowncastError,
68}
69
70impl<C: ConnectionErrors> fmt::Display for ItemError<C> {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        fmt::Debug::fmt(self, f)
73    }
74}
75
76impl<C: ConnectionErrors> error::Error for ItemError<C> {}
77
78impl<S, C> RpcClient<S, C>
79where
80    S: Service,
81    C: Connector<S>,
82{
83    /// Call to the server that allows the client to stream, single response
84    pub async fn client_streaming<M>(
85        &self,
86        msg: M,
87    ) -> result::Result<
88        (
89            UpdateSink<C, M::Update>,
90            Boxed<result::Result<M::Response, ItemError<C>>>,
91        ),
92        Error<C>,
93    >
94    where
95        M: ClientStreamingMsg<S>,
96    {
97        let msg = msg.into();
98        let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
99        send.send(msg).map_err(Error::Send).await?;
100        let send = UpdateSink::<C, M::Update>::new(send);
101        let recv = async move {
102            let item = recv.next().await.ok_or(ItemError::EarlyClose)?;
103
104            match item {
105                Ok(msg) => M::Response::try_from(msg).map_err(|_| ItemError::DowncastError),
106                Err(e) => Err(ItemError::RecvError(e)),
107            }
108        }
109        .boxed();
110        Ok((send, recv))
111    }
112}
113
114impl<S, C> RpcChannel<S, C>
115where
116    S: Service,
117    C: StreamTypes<In = S::Req, Out = S::Res>,
118{
119    /// handle the message M using the given function on the target object
120    ///
121    /// If you want to support concurrent requests, you need to spawn this on a tokio task yourself.
122    pub async fn client_streaming<M, F, Fut, T>(
123        self,
124        req: M,
125        target: T,
126        f: F,
127    ) -> result::Result<(), RpcServerError<C>>
128    where
129        M: ClientStreamingMsg<S>,
130        F: FnOnce(T, M, UpdateStream<C, M::Update>) -> Fut + Send + 'static,
131        Fut: Future<Output = M::Response> + Send + 'static,
132        T: Send + 'static,
133    {
134        let Self { mut send, recv, .. } = self;
135        let (updates, read_error) = UpdateStream::new(recv);
136        race2(read_error.map(Err), async move {
137            // get the response
138            let res = f(target, req, updates).await;
139            // turn into a S::Res so we can send it
140            let res = res.into();
141            // send it and return the error if any
142            send.send(res).await.map_err(RpcServerError::SendError)
143        })
144        .await
145    }
146}