1use std::{
5 fmt::Debug,
6 marker::PhantomData,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use futures_lite::Stream;
12use futures_sink::Sink;
13use pin_project::pin_project;
14
15use crate::{
16 transport::{boxed::BoxableConnector, mapped::MappedConnector, StreamTypes},
17 Connector, Service,
18};
19
20pub type BoxedConnector<S> =
22 crate::transport::boxed::BoxedConnector<<S as crate::Service>::Res, <S as crate::Service>::Req>;
23
24#[cfg(feature = "flume-transport")]
25#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "flume-transport")))]
26pub type FlumeConnector<S> =
28 crate::transport::flume::FlumeConnector<<S as Service>::Res, <S as Service>::Req>;
29
30#[cfg(feature = "quinn-transport")]
31#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "quinn-transport")))]
32pub type QuinnConnector<S> =
34 crate::transport::quinn::QuinnConnector<<S as Service>::Res, <S as Service>::Req>;
35
36#[cfg(feature = "hyper-transport")]
37#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "hyper-transport")))]
38pub type HyperConnector<S> =
40 crate::transport::hyper::HyperConnector<<S as Service>::Res, <S as Service>::Req>;
41
42#[cfg(feature = "iroh-transport")]
43#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "iroh-transport")))]
44pub type IrohConnector<S> =
46 crate::transport::iroh::IrohConnector<<S as Service>::Res, <S as Service>::Req>;
47
48pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;
50
51#[derive(Debug)]
61pub struct RpcClient<S, C = BoxedConnector<S>> {
62 pub(crate) source: C,
63 pub(crate) _p: PhantomData<S>,
64}
65
66impl<S, C: Clone> Clone for RpcClient<S, C> {
67 fn clone(&self) -> Self {
68 Self {
69 source: self.source.clone(),
70 _p: PhantomData,
71 }
72 }
73}
74
75#[pin_project]
78#[derive(Debug)]
79pub struct UpdateSink<C, T>(#[pin] pub C::SendSink, PhantomData<T>)
80where
81 C: StreamTypes;
82
83impl<C, T> UpdateSink<C, T>
84where
85 C: StreamTypes,
86 T: Into<C::Out>,
87{
88 pub fn new(sink: C::SendSink) -> Self {
90 Self(sink, PhantomData)
91 }
92}
93
94impl<C, T> Sink<T> for UpdateSink<C, T>
95where
96 C: StreamTypes,
97 T: Into<C::Out>,
98{
99 type Error = C::SendError;
100
101 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102 self.project().0.poll_ready(cx)
103 }
104
105 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
106 let req = item.into();
107 self.project().0.start_send(req)
108 }
109
110 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
111 self.project().0.poll_flush(cx)
112 }
113
114 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115 self.project().0.poll_close(cx)
116 }
117}
118
119impl<S, C> RpcClient<S, C>
120where
121 S: Service,
122 C: Connector<S>,
123{
124 pub fn new(source: C) -> Self {
131 Self {
132 source,
133 _p: PhantomData,
134 }
135 }
136}
137
138impl<S, C> RpcClient<S, C>
139where
140 S: Service,
141 C: Connector<S>,
142{
143 pub fn into_inner(self) -> C {
145 self.source
146 }
147
148 pub fn map<SNext>(self) -> RpcClient<SNext, MappedConnector<SNext::Res, SNext::Req, C>>
158 where
159 SNext: Service,
160 S::Req: From<SNext::Req>,
161 SNext::Res: TryFrom<S::Res>,
162 {
163 RpcClient::new(self.source.map::<SNext::Res, SNext::Req>())
164 }
165
166 pub fn boxed(self) -> RpcClient<S, BoxedConnector<S>>
168 where
169 C: BoxableConnector<S::Res, S::Req>,
170 {
171 RpcClient::new(self.source.boxed())
172 }
173}
174
175impl<S, C> AsRef<C> for RpcClient<S, C>
176where
177 S: Service,
178 C: Connector<S>,
179{
180 fn as_ref(&self) -> &C {
181 &self.source
182 }
183}
184
185#[pin_project]
187pub(crate) struct DeferDrop<S: Stream, X>(#[pin] pub S, pub X);
188
189impl<S: Stream, X> Stream for DeferDrop<S, X> {
190 type Item = S::Item;
191
192 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
193 self.project().0.poll_next(cx)
194 }
195}