quic_rpc/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
//! Client side api
//!
//! The main entry point is [RpcClient].
use std::{
    fmt::Debug,
    marker::PhantomData,
    pin::Pin,
    task::{Context, Poll},
};

use futures_lite::Stream;
use futures_sink::Sink;
use pin_project::pin_project;

use crate::{
    transport::{boxed::BoxableConnector, mapped::MappedConnector, StreamTypes},
    Connector, Service,
};

/// Type alias for a boxed connection to a specific service
///
/// This is a convenience type alias for a boxed connection to a specific service.
pub type BoxedConnector<S> =
    crate::transport::boxed::BoxedConnector<<S as crate::Service>::Res, <S as crate::Service>::Req>;

/// Sync version of `future::stream::BoxStream`.
pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;

/// A client for a specific service
///
/// This is a wrapper around a [`Connector`] that serves as the entry point
/// for the client DSL.
///
/// Type parameters:
///
/// `S` is the service type that determines what interactions this client supports.
/// `C` is the connector that determines the transport.
#[derive(Debug)]
pub struct RpcClient<S, C = BoxedConnector<S>> {
    pub(crate) source: C,
    pub(crate) _p: PhantomData<S>,
}

impl<S, C: Clone> Clone for RpcClient<S, C> {
    fn clone(&self) -> Self {
        Self {
            source: self.source.clone(),
            _p: PhantomData,
        }
    }
}

/// Sink that can be used to send updates to the server for the two interaction patterns
/// that support it, [crate::message::ClientStreaming] and [crate::message::BidiStreaming].
#[pin_project]
#[derive(Debug)]
pub struct UpdateSink<C, T>(#[pin] pub C::SendSink, PhantomData<T>)
where
    C: StreamTypes;

impl<C, T> UpdateSink<C, T>
where
    C: StreamTypes,
    T: Into<C::Out>,
{
    /// Create a new update sink
    pub fn new(sink: C::SendSink) -> Self {
        Self(sink, PhantomData)
    }
}

impl<C, T> Sink<T> for UpdateSink<C, T>
where
    C: StreamTypes,
    T: Into<C::Out>,
{
    type Error = C::SendError;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().0.poll_ready(cx)
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        let req = item.into();
        self.project().0.start_send(req)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().0.poll_flush(cx)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().0.poll_close(cx)
    }
}

impl<S, C> RpcClient<S, C>
where
    S: Service,
    C: Connector<S>,
{
    /// Create a new rpc client for a specific [Service] given a compatible
    /// [Connector].
    ///
    /// This is where a generic typed connection is converted into a client for a specific service.
    ///
    /// You can get a client for a nested service by calling [map](RpcClient::map).
    pub fn new(source: C) -> Self {
        Self {
            source,
            _p: PhantomData,
        }
    }
}

impl<S, C> RpcClient<S, C>
where
    S: Service,
    C: Connector<S>,
{
    /// Get the underlying connection
    pub fn into_inner(self) -> C {
        self.source
    }

    /// Map this channel's service into an inner service.
    ///
    /// This method is available if the required bounds are upheld:
    /// SNext::Req: Into<S::Req> + TryFrom<S::Req>,
    /// SNext::Res: Into<S::Res> + TryFrom<S::Res>,
    ///
    /// Where SNext is the new service to map to and S is the current inner service.
    ///
    /// This method can be chained infintely.
    pub fn map<SNext>(self) -> RpcClient<SNext, MappedConnector<SNext::Res, SNext::Req, C>>
    where
        SNext: Service,
        S::Req: From<SNext::Req>,
        SNext::Res: TryFrom<S::Res>,
    {
        RpcClient::new(self.source.map::<SNext::Res, SNext::Req>())
    }

    /// box
    pub fn boxed(self) -> RpcClient<S, BoxedConnector<S>>
    where
        C: BoxableConnector<S::Res, S::Req>,
    {
        RpcClient::new(self.source.boxed())
    }
}

impl<S, C> AsRef<C> for RpcClient<S, C>
where
    S: Service,
    C: Connector<S>,
{
    fn as_ref(&self) -> &C {
        &self.source
    }
}

/// Wrap a stream with an additional item that is kept alive until the stream is dropped
#[pin_project]
pub(crate) struct DeferDrop<S: Stream, X>(#[pin] pub S, pub X);

impl<S: Stream, X> Stream for DeferDrop<S, X> {
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project().0.poll_next(cx)
    }
}