jsonrpc_client_transports/transports/
local.rs1use crate::{RpcChannel, RpcError, RpcResult};
4use futures::channel::mpsc;
5use futures::{
6 task::{Context, Poll},
7 Future, Sink, SinkExt, Stream, StreamExt,
8};
9use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata, Middleware};
10use jsonrpc_pubsub::Session;
11use std::ops::Deref;
12use std::pin::Pin;
13use std::sync::Arc;
14
15pub struct LocalRpc<THandler, TMetadata> {
17 handler: THandler,
18 meta: TMetadata,
19 buffered: Buffered,
20 queue: (mpsc::UnboundedSender<String>, mpsc::UnboundedReceiver<String>),
21}
22
23enum Buffered {
24 Request(BoxFuture<Option<String>>),
25 Response(String),
26 None,
27}
28
29impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
30where
31 TMetadata: Metadata,
32 TMiddleware: Middleware<TMetadata>,
33 THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
34{
35 pub fn new(handler: THandler) -> Self
37 where
38 TMetadata: Default,
39 {
40 Self::with_metadata(handler, Default::default())
41 }
42
43 pub fn with_metadata(handler: THandler, meta: TMetadata) -> Self {
45 Self {
46 handler,
47 meta,
48 buffered: Buffered::None,
49 queue: mpsc::unbounded(),
50 }
51 }
52}
53
54impl<TMetadata, THandler, TMiddleware> Stream for LocalRpc<THandler, TMetadata>
55where
56 TMetadata: Metadata + Unpin,
57 TMiddleware: Middleware<TMetadata> + Unpin,
58 THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
59{
60 type Item = String;
61
62 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
63 self.queue.1.poll_next_unpin(cx)
64 }
65}
66
67impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
68where
69 TMetadata: Metadata + Unpin,
70 TMiddleware: Middleware<TMetadata> + Unpin,
71 THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
72{
73 fn poll_buffered(&mut self, cx: &mut Context) -> Poll<Result<(), RpcError>> {
74 let response = match self.buffered {
75 Buffered::Request(ref mut r) => futures::ready!(r.as_mut().poll(cx)),
76 _ => None,
77 };
78 if let Some(response) = response {
79 self.buffered = Buffered::Response(response);
80 }
81
82 self.send_response().into()
83 }
84
85 fn send_response(&mut self) -> Result<(), RpcError> {
86 if let Buffered::Response(r) = std::mem::replace(&mut self.buffered, Buffered::None) {
87 self.queue.0.start_send(r).map_err(|e| RpcError::Other(Box::new(e)))?;
88 }
89 Ok(())
90 }
91}
92
93impl<TMetadata, THandler, TMiddleware> Sink<String> for LocalRpc<THandler, TMetadata>
94where
95 TMetadata: Metadata + Unpin,
96 TMiddleware: Middleware<TMetadata> + Unpin,
97 THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
98{
99 type Error = RpcError;
100
101 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
102 futures::ready!(self.poll_buffered(cx))?;
103 futures::ready!(self.queue.0.poll_ready(cx))
104 .map_err(|e| RpcError::Other(Box::new(e)))
105 .into()
106 }
107
108 fn start_send(mut self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
109 let future = self.handler.handle_request(&item, self.meta.clone());
110 self.buffered = Buffered::Request(Box::pin(future));
111 Ok(())
112 }
113
114 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
115 futures::ready!(self.poll_buffered(cx))?;
116 futures::ready!(self.queue.0.poll_flush_unpin(cx))
117 .map_err(|e| RpcError::Other(Box::new(e)))
118 .into()
119 }
120
121 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
122 futures::ready!(self.queue.0.poll_close_unpin(cx))
123 .map_err(|e| RpcError::Other(Box::new(e)))
124 .into()
125 }
126}
127
128pub fn connect_with_metadata_and_middleware<TClient, THandler, TMetadata, TMiddleware>(
130 handler: THandler,
131 meta: TMetadata,
132) -> (TClient, impl Future<Output = RpcResult<()>>)
133where
134 TClient: From<RpcChannel>,
135 TMiddleware: Middleware<TMetadata> + Unpin,
136 THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
137 TMetadata: Metadata + Unpin,
138{
139 let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
140 let (rpc_client, sender) = crate::transports::duplex(Box::pin(sink), Box::pin(stream));
141 let client = TClient::from(sender);
142 (client, rpc_client)
143}
144
145pub fn connect_with_metadata<TClient, THandler, TMetadata>(
147 handler: THandler,
148 meta: TMetadata,
149) -> (TClient, impl Future<Output = RpcResult<()>>)
150where
151 TClient: From<RpcChannel>,
152 TMetadata: Metadata + Unpin,
153 THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
154{
155 connect_with_metadata_and_middleware(handler, meta)
156}
157
158pub fn connect_with_middleware<TClient, THandler, TMetadata, TMiddleware>(
160 handler: THandler,
161) -> (TClient, impl Future<Output = RpcResult<()>>)
162where
163 TClient: From<RpcChannel>,
164 TMetadata: Metadata + Default + Unpin,
165 TMiddleware: Middleware<TMetadata> + Unpin,
166 THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
167{
168 connect_with_metadata_and_middleware(handler, Default::default())
169}
170
171pub fn connect<TClient, THandler, TMetadata>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
173where
174 TClient: From<RpcChannel>,
175 TMetadata: Metadata + Default + Unpin,
176 THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
177{
178 connect_with_middleware(handler)
179}
180
181pub type LocalMeta = Arc<Session>;
183
184pub fn connect_with_pubsub_and_middleware<TClient, THandler, TMiddleware>(
186 handler: THandler,
187) -> (TClient, impl Future<Output = RpcResult<()>>)
188where
189 TClient: From<RpcChannel>,
190 TMiddleware: Middleware<LocalMeta> + Unpin,
191 THandler: Deref<Target = MetaIoHandler<LocalMeta, TMiddleware>> + Unpin,
192{
193 let (tx, rx) = mpsc::unbounded();
194 let meta = Arc::new(Session::new(tx));
195 let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
196 let stream = futures::stream::select(stream, rx);
197 let (rpc_client, sender) = crate::transports::duplex(Box::pin(sink), Box::pin(stream));
198 let client = TClient::from(sender);
199 (client, rpc_client)
200}
201
202pub fn connect_with_pubsub<TClient, THandler>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
204where
205 TClient: From<RpcChannel>,
206 THandler: Deref<Target = MetaIoHandler<LocalMeta>> + Unpin,
207{
208 connect_with_pubsub_and_middleware(handler)
209}