jsonrpc_client_transports/
lib.rs

1//! JSON-RPC client implementation.
2
3#![deny(missing_docs)]
4
5use jsonrpc_core::futures::channel::{mpsc, oneshot};
6use jsonrpc_core::futures::{
7	self,
8	task::{Context, Poll},
9	Future, Stream, StreamExt,
10};
11use jsonrpc_core::{Error, Params};
12use serde::de::DeserializeOwned;
13use serde::Serialize;
14use serde_json::Value;
15use std::marker::PhantomData;
16use std::pin::Pin;
17
18pub mod transports;
19
20#[cfg(test)]
21mod logger;
22
23/// The errors returned by the client.
24#[derive(Debug, derive_more::Display)]
25pub enum RpcError {
26	/// An error returned by the server.
27	#[display(fmt = "Server returned rpc error {}", _0)]
28	JsonRpcError(Error),
29	/// Failure to parse server response.
30	#[display(fmt = "Failed to parse server response as {}: {}", _0, _1)]
31	ParseError(String, Box<dyn std::error::Error + Send>),
32	/// Request timed out.
33	#[display(fmt = "Request timed out")]
34	Timeout,
35	/// A general client error.
36	#[display(fmt = "Client error: {}", _0)]
37	Client(String),
38	/// Not rpc specific errors.
39	#[display(fmt = "{}", _0)]
40	Other(Box<dyn std::error::Error + Send>),
41}
42
43impl std::error::Error for RpcError {
44	fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
45		match *self {
46			Self::JsonRpcError(ref e) => Some(e),
47			Self::ParseError(_, ref e) => Some(&**e),
48			Self::Other(ref e) => Some(&**e),
49			_ => None,
50		}
51	}
52}
53
54impl From<Error> for RpcError {
55	fn from(error: Error) -> Self {
56		RpcError::JsonRpcError(error)
57	}
58}
59
60/// A result returned by the client.
61pub type RpcResult<T> = Result<T, RpcError>;
62
63/// An RPC call message.
64struct CallMessage {
65	/// The RPC method name.
66	method: String,
67	/// The RPC method parameters.
68	params: Params,
69	/// The oneshot channel to send the result of the rpc
70	/// call to.
71	sender: oneshot::Sender<RpcResult<Value>>,
72}
73
74/// An RPC notification.
75struct NotifyMessage {
76	/// The RPC method name.
77	method: String,
78	/// The RPC method paramters.
79	params: Params,
80}
81
82/// An RPC subscription.
83struct Subscription {
84	/// The subscribe method name.
85	subscribe: String,
86	/// The subscribe method parameters.
87	subscribe_params: Params,
88	/// The name of the notification.
89	notification: String,
90	/// The unsubscribe method name.
91	unsubscribe: String,
92}
93
94/// An RPC subscribe message.
95struct SubscribeMessage {
96	/// The subscription to subscribe to.
97	subscription: Subscription,
98	/// The channel to send notifications to.
99	sender: mpsc::UnboundedSender<RpcResult<Value>>,
100}
101
102/// A message sent to the `RpcClient`.
103enum RpcMessage {
104	/// Make an RPC call.
105	Call(CallMessage),
106	/// Send a notification.
107	Notify(NotifyMessage),
108	/// Subscribe to a notification.
109	Subscribe(SubscribeMessage),
110}
111
112impl From<CallMessage> for RpcMessage {
113	fn from(msg: CallMessage) -> Self {
114		RpcMessage::Call(msg)
115	}
116}
117
118impl From<NotifyMessage> for RpcMessage {
119	fn from(msg: NotifyMessage) -> Self {
120		RpcMessage::Notify(msg)
121	}
122}
123
124impl From<SubscribeMessage> for RpcMessage {
125	fn from(msg: SubscribeMessage) -> Self {
126		RpcMessage::Subscribe(msg)
127	}
128}
129
130/// A channel to a `RpcClient`.
131#[derive(Clone)]
132pub struct RpcChannel(mpsc::UnboundedSender<RpcMessage>);
133
134impl RpcChannel {
135	fn send(&self, msg: RpcMessage) -> Result<(), mpsc::TrySendError<RpcMessage>> {
136		self.0.unbounded_send(msg)
137	}
138}
139
140impl From<mpsc::UnboundedSender<RpcMessage>> for RpcChannel {
141	fn from(sender: mpsc::UnboundedSender<RpcMessage>) -> Self {
142		RpcChannel(sender)
143	}
144}
145
146/// The future returned by the rpc call.
147pub type RpcFuture = oneshot::Receiver<Result<Value, RpcError>>;
148
149/// The stream returned by a subscribe.
150pub type SubscriptionStream = mpsc::UnboundedReceiver<Result<Value, RpcError>>;
151
152/// A typed subscription stream.
153pub struct TypedSubscriptionStream<T> {
154	_marker: PhantomData<T>,
155	returns: &'static str,
156	stream: SubscriptionStream,
157}
158
159impl<T> TypedSubscriptionStream<T> {
160	/// Creates a new `TypedSubscriptionStream`.
161	pub fn new(stream: SubscriptionStream, returns: &'static str) -> Self {
162		TypedSubscriptionStream {
163			_marker: PhantomData,
164			returns,
165			stream,
166		}
167	}
168}
169
170impl<T: DeserializeOwned + Unpin + 'static> Stream for TypedSubscriptionStream<T> {
171	type Item = RpcResult<T>;
172
173	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
174		let result = futures::ready!(self.stream.poll_next_unpin(cx));
175		match result {
176			Some(Ok(value)) => Some(
177				serde_json::from_value::<T>(value)
178					.map_err(|error| RpcError::ParseError(self.returns.into(), Box::new(error))),
179			),
180			None => None,
181			Some(Err(err)) => Some(Err(err)),
182		}
183		.into()
184	}
185}
186
187/// Client for raw JSON RPC requests
188#[derive(Clone)]
189pub struct RawClient(RpcChannel);
190
191impl From<RpcChannel> for RawClient {
192	fn from(channel: RpcChannel) -> Self {
193		RawClient(channel)
194	}
195}
196
197impl RawClient {
198	/// Call RPC method with raw JSON.
199	pub fn call_method(&self, method: &str, params: Params) -> impl Future<Output = RpcResult<Value>> {
200		let (sender, receiver) = oneshot::channel();
201		let msg = CallMessage {
202			method: method.into(),
203			params,
204			sender,
205		};
206		let result = self.0.send(msg.into());
207		async move {
208			let () = result.map_err(|e| RpcError::Other(Box::new(e)))?;
209
210			receiver.await.map_err(|e| RpcError::Other(Box::new(e)))?
211		}
212	}
213
214	/// Send RPC notification with raw JSON.
215	pub fn notify(&self, method: &str, params: Params) -> RpcResult<()> {
216		let msg = NotifyMessage {
217			method: method.into(),
218			params,
219		};
220		match self.0.send(msg.into()) {
221			Ok(()) => Ok(()),
222			Err(error) => Err(RpcError::Other(Box::new(error))),
223		}
224	}
225
226	/// Subscribe to topic with raw JSON.
227	pub fn subscribe(
228		&self,
229		subscribe: &str,
230		subscribe_params: Params,
231		notification: &str,
232		unsubscribe: &str,
233	) -> RpcResult<SubscriptionStream> {
234		let (sender, receiver) = mpsc::unbounded();
235		let msg = SubscribeMessage {
236			subscription: Subscription {
237				subscribe: subscribe.into(),
238				subscribe_params,
239				notification: notification.into(),
240				unsubscribe: unsubscribe.into(),
241			},
242			sender,
243		};
244
245		self.0
246			.send(msg.into())
247			.map(|()| receiver)
248			.map_err(|e| RpcError::Other(Box::new(e)))
249	}
250}
251
252/// Client for typed JSON RPC requests
253#[derive(Clone)]
254pub struct TypedClient(RawClient);
255
256impl From<RpcChannel> for TypedClient {
257	fn from(channel: RpcChannel) -> Self {
258		TypedClient(channel.into())
259	}
260}
261
262impl TypedClient {
263	/// Create a new `TypedClient`.
264	pub fn new(raw_cli: RawClient) -> Self {
265		TypedClient(raw_cli)
266	}
267
268	/// Call RPC with serialization of request and deserialization of response.
269	pub fn call_method<T: Serialize, R: DeserializeOwned>(
270		&self,
271		method: &str,
272		returns: &str,
273		args: T,
274	) -> impl Future<Output = RpcResult<R>> {
275		let returns = returns.to_owned();
276		let args =
277			serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
278		let params = match args {
279			Value::Array(vec) => Ok(Params::Array(vec)),
280			Value::Null => Ok(Params::None),
281			Value::Object(map) => Ok(Params::Map(map)),
282			_ => Err(RpcError::Client(
283				"RPC params should serialize to a JSON array, JSON object or null".into(),
284			)),
285		};
286		let result = params.map(|params| self.0.call_method(method, params));
287
288		async move {
289			let value: Value = result?.await?;
290
291			log::debug!("response: {:?}", value);
292
293			serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns, Box::new(error)))
294		}
295	}
296
297	/// Call RPC with serialization of request only.
298	pub fn notify<T: Serialize>(&self, method: &str, args: T) -> RpcResult<()> {
299		let args =
300			serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
301		let params = match args {
302			Value::Array(vec) => Params::Array(vec),
303			Value::Null => Params::None,
304			_ => {
305				return Err(RpcError::Client(
306					"RPC params should serialize to a JSON array, or null".into(),
307				))
308			}
309		};
310
311		self.0.notify(method, params)
312	}
313
314	/// Subscribe with serialization of request and deserialization of response.
315	pub fn subscribe<T: Serialize, R: DeserializeOwned + 'static>(
316		&self,
317		subscribe: &str,
318		subscribe_params: T,
319		topic: &str,
320		unsubscribe: &str,
321		returns: &'static str,
322	) -> RpcResult<TypedSubscriptionStream<R>> {
323		let args = serde_json::to_value(subscribe_params)
324			.expect("Only types with infallible serialisation can be used for JSON-RPC");
325
326		let params = match args {
327			Value::Array(vec) => Params::Array(vec),
328			Value::Null => Params::None,
329			_ => {
330				return Err(RpcError::Client(
331					"RPC params should serialize to a JSON array, or null".into(),
332				))
333			}
334		};
335
336		self.0
337			.subscribe(subscribe, params, topic, unsubscribe)
338			.map(move |stream| TypedSubscriptionStream::new(stream, returns))
339	}
340}
341
342#[cfg(test)]
343mod tests {
344	use super::*;
345	use crate::transports::local;
346	use crate::{RpcChannel, TypedClient};
347	use jsonrpc_core::futures::{future, FutureExt};
348	use jsonrpc_core::{self as core, IoHandler};
349	use jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId};
350	use std::sync::atomic::{AtomicBool, Ordering};
351	use std::sync::Arc;
352
353	#[derive(Clone)]
354	struct AddClient(TypedClient);
355
356	impl From<RpcChannel> for AddClient {
357		fn from(channel: RpcChannel) -> Self {
358			AddClient(channel.into())
359		}
360	}
361
362	impl AddClient {
363		fn add(&self, a: u64, b: u64) -> impl Future<Output = RpcResult<u64>> {
364			self.0.call_method("add", "u64", (a, b))
365		}
366
367		fn completed(&self, success: bool) -> RpcResult<()> {
368			self.0.notify("completed", (success,))
369		}
370	}
371
372	#[test]
373	fn test_client_terminates() {
374		crate::logger::init_log();
375		let mut handler = IoHandler::new();
376		handler.add_sync_method("add", |params: Params| {
377			let (a, b) = params.parse::<(u64, u64)>()?;
378			let res = a + b;
379			Ok(jsonrpc_core::to_value(res).unwrap())
380		});
381
382		let (tx, rx) = std::sync::mpsc::channel();
383		let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
384		let fut = async move {
385			let res = client.add(3, 4).await?;
386			let res = client.add(res, 5).await?;
387			assert_eq!(res, 12);
388			tx.send(()).unwrap();
389			Ok(()) as RpcResult<_>
390		};
391		let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
392		pool.spawn_ok(rpc_client.map(|x| x.unwrap()));
393		pool.spawn_ok(fut.map(|x| x.unwrap()));
394		rx.recv().unwrap()
395	}
396
397	#[test]
398	fn should_send_notification() {
399		crate::logger::init_log();
400		let (tx, rx) = std::sync::mpsc::sync_channel(1);
401		let mut handler = IoHandler::new();
402		handler.add_notification("completed", move |params: Params| {
403			let (success,) = params.parse::<(bool,)>().expect("expected to receive one boolean");
404			assert_eq!(success, true);
405			tx.send(()).unwrap();
406		});
407
408		let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
409		client.completed(true).unwrap();
410		let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
411		pool.spawn_ok(rpc_client.map(|x| x.unwrap()));
412		rx.recv().unwrap()
413	}
414
415	#[test]
416	fn should_handle_subscription() {
417		crate::logger::init_log();
418		// given
419		let (finish, finished) = std::sync::mpsc::sync_channel(1);
420		let mut handler = PubSubHandler::<local::LocalMeta, _>::default();
421		let called = Arc::new(AtomicBool::new(false));
422		let called2 = called.clone();
423		handler.add_subscription(
424			"hello",
425			("subscribe_hello", move |params, _meta, subscriber: Subscriber| {
426				assert_eq!(params, core::Params::None);
427				let sink = subscriber
428					.assign_id(SubscriptionId::Number(5))
429					.expect("assigned subscription id");
430				let finish = finish.clone();
431				std::thread::spawn(move || {
432					for i in 0..3 {
433						std::thread::sleep(std::time::Duration::from_millis(100));
434						let value = serde_json::json!({
435							"subscription": 5,
436							"result": vec![i],
437						});
438						let _ = sink.notify(serde_json::from_value(value).unwrap());
439					}
440					finish.send(()).unwrap();
441				});
442			}),
443			("unsubscribe_hello", move |id, _meta| {
444				// Should be called because session is dropped.
445				called2.store(true, Ordering::SeqCst);
446				assert_eq!(id, SubscriptionId::Number(5));
447				future::ready(Ok(core::Value::Bool(true)))
448			}),
449		);
450
451		// when
452		let (tx, rx) = std::sync::mpsc::channel();
453		let (client, rpc_client) = local::connect_with_pubsub::<TypedClient, _>(handler);
454		let received = Arc::new(std::sync::Mutex::new(vec![]));
455		let r2 = received.clone();
456		let fut = async move {
457			let mut stream =
458				client.subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32")?;
459			let result = stream.next().await;
460			r2.lock().unwrap().push(result.expect("Expected at least one item."));
461			tx.send(()).unwrap();
462			Ok(()) as RpcResult<_>
463		};
464
465		let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
466		pool.spawn_ok(rpc_client.map(|_| ()));
467		pool.spawn_ok(fut.map(|x| x.unwrap()));
468
469		rx.recv().unwrap();
470		assert!(
471			!received.lock().unwrap().is_empty(),
472			"Expected at least one received item."
473		);
474		// The session is being dropped only when another notification is received.
475		// TODO [ToDr] we should unsubscribe as soon as the stream is dropped instead!
476		finished.recv().unwrap();
477		assert_eq!(called.load(Ordering::SeqCst), true, "Unsubscribe not called.");
478	}
479}