quic_rpc/
client.rs

1//! Client side api
2//!
3//! The main entry point is [RpcClient].
4use std::{
5    fmt::Debug,
6    marker::PhantomData,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use futures_lite::Stream;
12use futures_sink::Sink;
13use pin_project::pin_project;
14
15use crate::{
16    transport::{boxed::BoxableConnector, mapped::MappedConnector, StreamTypes},
17    Connector, Service,
18};
19
20/// A boxed connector for the given [`Service`]
21pub type BoxedConnector<S> =
22    crate::transport::boxed::BoxedConnector<<S as crate::Service>::Res, <S as crate::Service>::Req>;
23
24#[cfg(feature = "flume-transport")]
25#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "flume-transport")))]
26/// A flume connector for the given [`Service`]
27pub type FlumeConnector<S> =
28    crate::transport::flume::FlumeConnector<<S as Service>::Res, <S as Service>::Req>;
29
30#[cfg(feature = "quinn-transport")]
31#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "quinn-transport")))]
32/// A quinn connector for the given [`Service`]
33pub type QuinnConnector<S> =
34    crate::transport::quinn::QuinnConnector<<S as Service>::Res, <S as Service>::Req>;
35
36#[cfg(feature = "hyper-transport")]
37#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "hyper-transport")))]
38/// A hyper connector for the given [`Service`]
39pub type HyperConnector<S> =
40    crate::transport::hyper::HyperConnector<<S as Service>::Res, <S as Service>::Req>;
41
42#[cfg(feature = "iroh-transport")]
43#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "iroh-transport")))]
44/// An iroh connector for the given [`Service`]
45pub type IrohConnector<S> =
46    crate::transport::iroh::IrohConnector<<S as Service>::Res, <S as Service>::Req>;
47
48/// Sync version of `future::stream::BoxStream`.
49pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;
50
51/// A client for a specific service
52///
53/// This is a wrapper around a [`Connector`] that serves as the entry point
54/// for the client DSL.
55///
56/// Type parameters:
57///
58/// `S` is the service type that determines what interactions this client supports.
59/// `C` is the connector that determines the transport.
60#[derive(Debug)]
61pub struct RpcClient<S, C = BoxedConnector<S>> {
62    pub(crate) source: C,
63    pub(crate) _p: PhantomData<S>,
64}
65
66impl<S, C: Clone> Clone for RpcClient<S, C> {
67    fn clone(&self) -> Self {
68        Self {
69            source: self.source.clone(),
70            _p: PhantomData,
71        }
72    }
73}
74
75/// Sink that can be used to send updates to the server for the two interaction patterns
76/// that support it, [crate::message::ClientStreaming] and [crate::message::BidiStreaming].
77#[pin_project]
78#[derive(Debug)]
79pub struct UpdateSink<C, T>(#[pin] pub C::SendSink, PhantomData<T>)
80where
81    C: StreamTypes;
82
83impl<C, T> UpdateSink<C, T>
84where
85    C: StreamTypes,
86    T: Into<C::Out>,
87{
88    /// Create a new update sink
89    pub fn new(sink: C::SendSink) -> Self {
90        Self(sink, PhantomData)
91    }
92}
93
94impl<C, T> Sink<T> for UpdateSink<C, T>
95where
96    C: StreamTypes,
97    T: Into<C::Out>,
98{
99    type Error = C::SendError;
100
101    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102        self.project().0.poll_ready(cx)
103    }
104
105    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
106        let req = item.into();
107        self.project().0.start_send(req)
108    }
109
110    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
111        self.project().0.poll_flush(cx)
112    }
113
114    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115        self.project().0.poll_close(cx)
116    }
117}
118
119impl<S, C> RpcClient<S, C>
120where
121    S: Service,
122    C: Connector<S>,
123{
124    /// Create a new rpc client for a specific [Service] given a compatible
125    /// [Connector].
126    ///
127    /// This is where a generic typed connection is converted into a client for a specific service.
128    ///
129    /// You can get a client for a nested service by calling [map](RpcClient::map).
130    pub fn new(source: C) -> Self {
131        Self {
132            source,
133            _p: PhantomData,
134        }
135    }
136}
137
138impl<S, C> RpcClient<S, C>
139where
140    S: Service,
141    C: Connector<S>,
142{
143    /// Get the underlying connection
144    pub fn into_inner(self) -> C {
145        self.source
146    }
147
148    /// Map this channel's service into an inner service.
149    ///
150    /// This method is available if the required bounds are upheld:
151    /// SNext::Req: Into<S::Req> + TryFrom<S::Req>,
152    /// SNext::Res: Into<S::Res> + TryFrom<S::Res>,
153    ///
154    /// Where SNext is the new service to map to and S is the current inner service.
155    ///
156    /// This method can be chained infintely.
157    pub fn map<SNext>(self) -> RpcClient<SNext, MappedConnector<SNext::Res, SNext::Req, C>>
158    where
159        SNext: Service,
160        S::Req: From<SNext::Req>,
161        SNext::Res: TryFrom<S::Res>,
162    {
163        RpcClient::new(self.source.map::<SNext::Res, SNext::Req>())
164    }
165
166    /// box
167    pub fn boxed(self) -> RpcClient<S, BoxedConnector<S>>
168    where
169        C: BoxableConnector<S::Res, S::Req>,
170    {
171        RpcClient::new(self.source.boxed())
172    }
173}
174
175impl<S, C> AsRef<C> for RpcClient<S, C>
176where
177    S: Service,
178    C: Connector<S>,
179{
180    fn as_ref(&self) -> &C {
181        &self.source
182    }
183}
184
185/// Wrap a stream with an additional item that is kept alive until the stream is dropped
186#[pin_project]
187pub(crate) struct DeferDrop<S: Stream, X>(#[pin] pub S, pub X);
188
189impl<S: Stream, X> Stream for DeferDrop<S, X> {
190    type Item = S::Item;
191
192    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
193        self.project().0.poll_next(cx)
194    }
195}