quic_rpc/client.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
//! Client side api
//!
//! The main entry point is [RpcClient].
use std::{
fmt::Debug,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures_lite::Stream;
use futures_sink::Sink;
use pin_project::pin_project;
use crate::{
transport::{boxed::BoxableConnector, mapped::MappedConnector, StreamTypes},
Connector, Service,
};
/// Type alias for a boxed connection to a specific service
///
/// This is a convenience type alias for a boxed connection to a specific service.
pub type BoxedConnector<S> =
crate::transport::boxed::BoxedConnector<<S as crate::Service>::Res, <S as crate::Service>::Req>;
/// Sync version of `future::stream::BoxStream`.
pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;
/// A client for a specific service
///
/// This is a wrapper around a [`Connector`] that serves as the entry point
/// for the client DSL.
///
/// Type parameters:
///
/// `S` is the service type that determines what interactions this client supports.
/// `C` is the connector that determines the transport.
#[derive(Debug)]
pub struct RpcClient<S, C = BoxedConnector<S>> {
pub(crate) source: C,
pub(crate) _p: PhantomData<S>,
}
impl<S, C: Clone> Clone for RpcClient<S, C> {
fn clone(&self) -> Self {
Self {
source: self.source.clone(),
_p: PhantomData,
}
}
}
/// Sink that can be used to send updates to the server for the two interaction patterns
/// that support it, [crate::message::ClientStreaming] and [crate::message::BidiStreaming].
#[pin_project]
#[derive(Debug)]
pub struct UpdateSink<C, T>(#[pin] pub C::SendSink, PhantomData<T>)
where
C: StreamTypes;
impl<C, T> UpdateSink<C, T>
where
C: StreamTypes,
T: Into<C::Out>,
{
/// Create a new update sink
pub fn new(sink: C::SendSink) -> Self {
Self(sink, PhantomData)
}
}
impl<C, T> Sink<T> for UpdateSink<C, T>
where
C: StreamTypes,
T: Into<C::Out>,
{
type Error = C::SendError;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let req = item.into();
self.project().0.start_send(req)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_close(cx)
}
}
impl<S, C> RpcClient<S, C>
where
S: Service,
C: Connector<S>,
{
/// Create a new rpc client for a specific [Service] given a compatible
/// [Connector].
///
/// This is where a generic typed connection is converted into a client for a specific service.
///
/// You can get a client for a nested service by calling [map](RpcClient::map).
pub fn new(source: C) -> Self {
Self {
source,
_p: PhantomData,
}
}
}
impl<S, C> RpcClient<S, C>
where
S: Service,
C: Connector<S>,
{
/// Get the underlying connection
pub fn into_inner(self) -> C {
self.source
}
/// Map this channel's service into an inner service.
///
/// This method is available if the required bounds are upheld:
/// SNext::Req: Into<S::Req> + TryFrom<S::Req>,
/// SNext::Res: Into<S::Res> + TryFrom<S::Res>,
///
/// Where SNext is the new service to map to and S is the current inner service.
///
/// This method can be chained infintely.
pub fn map<SNext>(self) -> RpcClient<SNext, MappedConnector<SNext::Res, SNext::Req, C>>
where
SNext: Service,
S::Req: From<SNext::Req>,
SNext::Res: TryFrom<S::Res>,
{
RpcClient::new(self.source.map::<SNext::Res, SNext::Req>())
}
/// box
pub fn boxed(self) -> RpcClient<S, BoxedConnector<S>>
where
C: BoxableConnector<S::Res, S::Req>,
{
RpcClient::new(self.source.boxed())
}
}
impl<S, C> AsRef<C> for RpcClient<S, C>
where
S: Service,
C: Connector<S>,
{
fn as_ref(&self) -> &C {
&self.source
}
}
/// Wrap a stream with an additional item that is kept alive until the stream is dropped
#[pin_project]
pub(crate) struct DeferDrop<S: Stream, X>(#[pin] pub S, pub X);
impl<S: Stream, X> Stream for DeferDrop<S, X> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}
}