jsonrpc_client_transports/transports/
local.rs

1//! Rpc client implementation for `Deref<Target=MetaIoHandler<Metadata>>`.
2
3use crate::{RpcChannel, RpcError, RpcResult};
4use futures::channel::mpsc;
5use futures::{
6	task::{Context, Poll},
7	Future, Sink, SinkExt, Stream, StreamExt,
8};
9use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata, Middleware};
10use jsonrpc_pubsub::Session;
11use std::ops::Deref;
12use std::pin::Pin;
13use std::sync::Arc;
14
15/// Implements a rpc client for `MetaIoHandler`.
16pub struct LocalRpc<THandler, TMetadata> {
17	handler: THandler,
18	meta: TMetadata,
19	buffered: Buffered,
20	queue: (mpsc::UnboundedSender<String>, mpsc::UnboundedReceiver<String>),
21}
22
23enum Buffered {
24	Request(BoxFuture<Option<String>>),
25	Response(String),
26	None,
27}
28
29impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
30where
31	TMetadata: Metadata,
32	TMiddleware: Middleware<TMetadata>,
33	THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
34{
35	/// Creates a new `LocalRpc` with default metadata.
36	pub fn new(handler: THandler) -> Self
37	where
38		TMetadata: Default,
39	{
40		Self::with_metadata(handler, Default::default())
41	}
42
43	/// Creates a new `LocalRpc` with given handler and metadata.
44	pub fn with_metadata(handler: THandler, meta: TMetadata) -> Self {
45		Self {
46			handler,
47			meta,
48			buffered: Buffered::None,
49			queue: mpsc::unbounded(),
50		}
51	}
52}
53
54impl<TMetadata, THandler, TMiddleware> Stream for LocalRpc<THandler, TMetadata>
55where
56	TMetadata: Metadata + Unpin,
57	TMiddleware: Middleware<TMetadata> + Unpin,
58	THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
59{
60	type Item = String;
61
62	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
63		self.queue.1.poll_next_unpin(cx)
64	}
65}
66
67impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
68where
69	TMetadata: Metadata + Unpin,
70	TMiddleware: Middleware<TMetadata> + Unpin,
71	THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
72{
73	fn poll_buffered(&mut self, cx: &mut Context) -> Poll<Result<(), RpcError>> {
74		let response = match self.buffered {
75			Buffered::Request(ref mut r) => futures::ready!(r.as_mut().poll(cx)),
76			_ => None,
77		};
78		if let Some(response) = response {
79			self.buffered = Buffered::Response(response);
80		}
81
82		self.send_response().into()
83	}
84
85	fn send_response(&mut self) -> Result<(), RpcError> {
86		if let Buffered::Response(r) = std::mem::replace(&mut self.buffered, Buffered::None) {
87			self.queue.0.start_send(r).map_err(|e| RpcError::Other(Box::new(e)))?;
88		}
89		Ok(())
90	}
91}
92
93impl<TMetadata, THandler, TMiddleware> Sink<String> for LocalRpc<THandler, TMetadata>
94where
95	TMetadata: Metadata + Unpin,
96	TMiddleware: Middleware<TMetadata> + Unpin,
97	THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
98{
99	type Error = RpcError;
100
101	fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
102		futures::ready!(self.poll_buffered(cx))?;
103		futures::ready!(self.queue.0.poll_ready(cx))
104			.map_err(|e| RpcError::Other(Box::new(e)))
105			.into()
106	}
107
108	fn start_send(mut self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
109		let future = self.handler.handle_request(&item, self.meta.clone());
110		self.buffered = Buffered::Request(Box::pin(future));
111		Ok(())
112	}
113
114	fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
115		futures::ready!(self.poll_buffered(cx))?;
116		futures::ready!(self.queue.0.poll_flush_unpin(cx))
117			.map_err(|e| RpcError::Other(Box::new(e)))
118			.into()
119	}
120
121	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
122		futures::ready!(self.queue.0.poll_close_unpin(cx))
123			.map_err(|e| RpcError::Other(Box::new(e)))
124			.into()
125	}
126}
127
128/// Connects to a `Deref<Target = MetaIoHandler<Metadata>` specifying a custom middleware implementation.
129pub fn connect_with_metadata_and_middleware<TClient, THandler, TMetadata, TMiddleware>(
130	handler: THandler,
131	meta: TMetadata,
132) -> (TClient, impl Future<Output = RpcResult<()>>)
133where
134	TClient: From<RpcChannel>,
135	TMiddleware: Middleware<TMetadata> + Unpin,
136	THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
137	TMetadata: Metadata + Unpin,
138{
139	let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
140	let (rpc_client, sender) = crate::transports::duplex(Box::pin(sink), Box::pin(stream));
141	let client = TClient::from(sender);
142	(client, rpc_client)
143}
144
145/// Connects to a `Deref<Target = MetaIoHandler<Metadata>`.
146pub fn connect_with_metadata<TClient, THandler, TMetadata>(
147	handler: THandler,
148	meta: TMetadata,
149) -> (TClient, impl Future<Output = RpcResult<()>>)
150where
151	TClient: From<RpcChannel>,
152	TMetadata: Metadata + Unpin,
153	THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
154{
155	connect_with_metadata_and_middleware(handler, meta)
156}
157
158/// Connects to a `Deref<Target = MetaIoHandler<Metadata + Default>` specifying a custom middleware implementation.
159pub fn connect_with_middleware<TClient, THandler, TMetadata, TMiddleware>(
160	handler: THandler,
161) -> (TClient, impl Future<Output = RpcResult<()>>)
162where
163	TClient: From<RpcChannel>,
164	TMetadata: Metadata + Default + Unpin,
165	TMiddleware: Middleware<TMetadata> + Unpin,
166	THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
167{
168	connect_with_metadata_and_middleware(handler, Default::default())
169}
170
171/// Connects to a `Deref<Target = MetaIoHandler<Metadata + Default>`.
172pub fn connect<TClient, THandler, TMetadata>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
173where
174	TClient: From<RpcChannel>,
175	TMetadata: Metadata + Default + Unpin,
176	THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
177{
178	connect_with_middleware(handler)
179}
180
181/// Metadata for LocalRpc.
182pub type LocalMeta = Arc<Session>;
183
184/// Connects with pubsub specifying a custom middleware implementation.
185pub fn connect_with_pubsub_and_middleware<TClient, THandler, TMiddleware>(
186	handler: THandler,
187) -> (TClient, impl Future<Output = RpcResult<()>>)
188where
189	TClient: From<RpcChannel>,
190	TMiddleware: Middleware<LocalMeta> + Unpin,
191	THandler: Deref<Target = MetaIoHandler<LocalMeta, TMiddleware>> + Unpin,
192{
193	let (tx, rx) = mpsc::unbounded();
194	let meta = Arc::new(Session::new(tx));
195	let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
196	let stream = futures::stream::select(stream, rx);
197	let (rpc_client, sender) = crate::transports::duplex(Box::pin(sink), Box::pin(stream));
198	let client = TClient::from(sender);
199	(client, rpc_client)
200}
201
202/// Connects with pubsub.
203pub fn connect_with_pubsub<TClient, THandler>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
204where
205	TClient: From<RpcChannel>,
206	THandler: Deref<Target = MetaIoHandler<LocalMeta>> + Unpin,
207{
208	connect_with_pubsub_and_middleware(handler)
209}