1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
//! Client side api
//!
//! The main entry point is [RpcClient].
use crate::{
    map::{ChainedMapper, MapService, Mapper},
    Service, ServiceConnection,
};
use futures_lite::Stream;
use futures_sink::Sink;

use pin_project::pin_project;
use std::{
    fmt::Debug,
    marker::PhantomData,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

/// Sync version of `future::stream::BoxStream`.
pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;

/// A client for a specific service
///
/// This is a wrapper around a [ServiceConnection] that serves as the entry point
/// for the client DSL. `S` is the service type, `C` is the substream source.
#[derive(Debug)]
pub struct RpcClient<S, C, SInner = S> {
    pub(crate) source: C,
    pub(crate) map: Arc<dyn MapService<S, SInner>>,
}

impl<S, C: Clone, SInner> Clone for RpcClient<S, C, SInner> {
    fn clone(&self) -> Self {
        Self {
            source: self.source.clone(),
            map: Arc::clone(&self.map),
        }
    }
}

/// Sink that can be used to send updates to the server for the two interaction patterns
/// that support it, [crate::message::ClientStreaming] and [crate::message::BidiStreaming].
#[pin_project]
#[derive(Debug)]
pub struct UpdateSink<S, C, T, SInner = S>(
    #[pin] pub C::SendSink,
    pub PhantomData<T>,
    pub Arc<dyn MapService<S, SInner>>,
)
where
    S: Service,
    SInner: Service,
    C: ServiceConnection<S>,
    T: Into<SInner::Req>;

impl<S, C, T, SInner> Sink<T> for UpdateSink<S, C, T, SInner>
where
    S: Service,
    SInner: Service,
    C: ServiceConnection<S>,
    T: Into<SInner::Req>,
{
    type Error = C::SendError;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().0.poll_ready(cx)
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        let req = self.2.req_into_outer(item.into());
        self.project().0.start_send(req)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().0.poll_flush(cx)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().0.poll_close(cx)
    }
}

impl<S, C> RpcClient<S, C, S>
where
    S: Service,
    C: ServiceConnection<S>,
{
    /// Create a new rpc client for a specific [Service] given a compatible
    /// [ServiceConnection].
    ///
    /// This is where a generic typed connection is converted into a client for a specific service.
    pub fn new(source: C) -> Self {
        Self {
            source,
            map: Arc::new(Mapper::new()),
        }
    }
}

impl<S, C, SInner> RpcClient<S, C, SInner>
where
    S: Service,
    C: ServiceConnection<S>,
    SInner: Service,
{
    /// Get the underlying connection
    pub fn into_inner(self) -> C {
        self.source
    }

    /// Map this channel's service into an inner service.
    ///
    /// This method is available if the required bounds are upheld:
    /// SNext::Req: Into<SInner::Req> + TryFrom<SInner::Req>,
    /// SNext::Res: Into<SInner::Res> + TryFrom<SInner::Res>,
    ///
    /// Where SNext is the new service to map to and SInner is the current inner service.
    ///
    /// This method can be chained infintely.
    pub fn map<SNext>(self) -> RpcClient<S, C, SNext>
    where
        SNext: Service,
        SNext::Req: Into<SInner::Req> + TryFrom<SInner::Req>,
        SNext::Res: Into<SInner::Res> + TryFrom<SInner::Res>,
    {
        let map = ChainedMapper::new(self.map);
        RpcClient {
            source: self.source,
            map: Arc::new(map),
        }
    }
}

impl<S, C, SInner> AsRef<C> for RpcClient<S, C, SInner>
where
    S: Service,
    C: ServiceConnection<S>,
    SInner: Service,
{
    fn as_ref(&self) -> &C {
        &self.source
    }
}

/// Wrap a stream with an additional item that is kept alive until the stream is dropped
#[pin_project]
pub(crate) struct DeferDrop<S: Stream, X>(#[pin] pub S, pub X);

impl<S: Stream, X> Stream for DeferDrop<S, X> {
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project().0.poll_next(cx)
    }
}