jsonrpc_core/
io.rs

1use std::collections::{
2	hash_map::{IntoIter, Iter},
3	HashMap,
4};
5use std::future::Future;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::sync::Arc;
9
10use futures_util::{self, future, FutureExt};
11
12use crate::calls::{
13	Metadata, RemoteProcedure, RpcMethod, RpcMethodSimple, RpcMethodSync, RpcNotification, RpcNotificationSimple,
14};
15use crate::middleware::{self, Middleware};
16use crate::types::{Call, Output, Request, Response};
17use crate::types::{Error, ErrorCode, Version};
18
19/// A type representing middleware or RPC response before serialization.
20pub type FutureResponse = Pin<Box<dyn Future<Output = Option<Response>> + Send>>;
21
22/// A type representing middleware or RPC call output.
23pub type FutureOutput = Pin<Box<dyn Future<Output = Option<Output>> + Send>>;
24
25/// A type representing future string response.
26pub type FutureResult<F, G> = future::Map<
27	future::Either<future::Ready<Option<Response>>, FutureRpcResult<F, G>>,
28	fn(Option<Response>) -> Option<String>,
29>;
30
31/// A type representing a result of a single method call.
32pub type FutureRpcOutput<F> = future::Either<F, future::Either<FutureOutput, future::Ready<Option<Output>>>>;
33
34/// A type representing an optional `Response` for RPC `Request`.
35pub type FutureRpcResult<F, G> = future::Either<
36	F,
37	future::Either<
38		future::Map<FutureRpcOutput<G>, fn(Option<Output>) -> Option<Response>>,
39		future::Map<future::JoinAll<FutureRpcOutput<G>>, fn(Vec<Option<Output>>) -> Option<Response>>,
40	>,
41>;
42
43/// `IoHandler` json-rpc protocol compatibility
44#[derive(Debug, Clone, Copy)]
45pub enum Compatibility {
46	/// Compatible only with JSON-RPC 1.x
47	V1,
48	/// Compatible only with JSON-RPC 2.0
49	V2,
50	/// Compatible with both
51	Both,
52}
53
54impl Default for Compatibility {
55	fn default() -> Self {
56		Compatibility::V2
57	}
58}
59
60impl Compatibility {
61	fn is_version_valid(self, version: Option<Version>) -> bool {
62		matches!(
63			(self, version),
64			(Compatibility::V1, None) | (Compatibility::V2, Some(Version::V2)) | (Compatibility::Both, _)
65		)
66	}
67
68	fn default_version(self) -> Option<Version> {
69		match self {
70			Compatibility::V1 => None,
71			Compatibility::V2 | Compatibility::Both => Some(Version::V2),
72		}
73	}
74}
75
76/// Request handler
77///
78/// By default compatible only with jsonrpc v2
79#[derive(Clone, Debug)]
80pub struct MetaIoHandler<T: Metadata, S: Middleware<T> = middleware::Noop> {
81	middleware: S,
82	compatibility: Compatibility,
83	methods: HashMap<String, RemoteProcedure<T>>,
84}
85
86impl<T: Metadata> Default for MetaIoHandler<T> {
87	fn default() -> Self {
88		MetaIoHandler::with_compatibility(Default::default())
89	}
90}
91
92impl<T: Metadata, S: Middleware<T>> IntoIterator for MetaIoHandler<T, S> {
93	type Item = (String, RemoteProcedure<T>);
94	type IntoIter = IntoIter<String, RemoteProcedure<T>>;
95
96	fn into_iter(self) -> Self::IntoIter {
97		self.methods.into_iter()
98	}
99}
100
101impl<'a, T: Metadata, S: Middleware<T>> IntoIterator for &'a MetaIoHandler<T, S> {
102	type Item = (&'a String, &'a RemoteProcedure<T>);
103	type IntoIter = Iter<'a, String, RemoteProcedure<T>>;
104
105	fn into_iter(self) -> Self::IntoIter {
106		self.methods.iter()
107	}
108}
109
110impl<T: Metadata> MetaIoHandler<T> {
111	/// Creates new `MetaIoHandler` compatible with specified protocol version.
112	pub fn with_compatibility(compatibility: Compatibility) -> Self {
113		MetaIoHandler {
114			compatibility,
115			middleware: Default::default(),
116			methods: Default::default(),
117		}
118	}
119}
120
121impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
122	/// Creates new `MetaIoHandler`
123	pub fn new(compatibility: Compatibility, middleware: S) -> Self {
124		MetaIoHandler {
125			compatibility,
126			middleware,
127			methods: Default::default(),
128		}
129	}
130
131	/// Creates new `MetaIoHandler` with specified middleware.
132	pub fn with_middleware(middleware: S) -> Self {
133		MetaIoHandler {
134			compatibility: Default::default(),
135			middleware,
136			methods: Default::default(),
137		}
138	}
139
140	/// Adds an alias to a method.
141	pub fn add_alias(&mut self, alias: &str, other: &str) {
142		self.methods.insert(alias.into(), RemoteProcedure::Alias(other.into()));
143	}
144
145	/// Adds new supported synchronous method.
146	///
147	/// A backward-compatible wrapper.
148	pub fn add_sync_method<F>(&mut self, name: &str, method: F)
149	where
150		F: RpcMethodSync,
151	{
152		self.add_method(name, move |params| method.call(params))
153	}
154
155	/// Adds new supported asynchronous method.
156	pub fn add_method<F>(&mut self, name: &str, method: F)
157	where
158		F: RpcMethodSimple,
159	{
160		self.add_method_with_meta(name, move |params, _meta| method.call(params))
161	}
162
163	/// Adds new supported notification
164	pub fn add_notification<F>(&mut self, name: &str, notification: F)
165	where
166		F: RpcNotificationSimple,
167	{
168		self.add_notification_with_meta(name, move |params, _meta| notification.execute(params))
169	}
170
171	/// Adds new supported asynchronous method with metadata support.
172	pub fn add_method_with_meta<F>(&mut self, name: &str, method: F)
173	where
174		F: RpcMethod<T>,
175	{
176		self.methods
177			.insert(name.into(), RemoteProcedure::Method(Arc::new(method)));
178	}
179
180	/// Adds new supported notification with metadata support.
181	pub fn add_notification_with_meta<F>(&mut self, name: &str, notification: F)
182	where
183		F: RpcNotification<T>,
184	{
185		self.methods
186			.insert(name.into(), RemoteProcedure::Notification(Arc::new(notification)));
187	}
188
189	/// Extend this `MetaIoHandler` with methods defined elsewhere.
190	pub fn extend_with<F>(&mut self, methods: F)
191	where
192		F: IntoIterator<Item = (String, RemoteProcedure<T>)>,
193	{
194		self.methods.extend(methods)
195	}
196
197	/// Handle given request synchronously - will block until response is available.
198	/// If you have any asynchronous methods in your RPC it is much wiser to use
199	/// `handle_request` instead and deal with asynchronous requests in a non-blocking fashion.
200	#[cfg(feature = "futures-executor")]
201	pub fn handle_request_sync(&self, request: &str, meta: T) -> Option<String> {
202		futures_executor::block_on(self.handle_request(request, meta))
203	}
204
205	/// Handle given request asynchronously.
206	pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future, S::CallFuture> {
207		use self::future::Either::{Left, Right};
208		fn as_string(response: Option<Response>) -> Option<String> {
209			let res = response.map(write_response);
210			debug!(target: "rpc", "Response: {}.", res.as_ref().unwrap_or(&"None".to_string()));
211			res
212		}
213
214		trace!(target: "rpc", "Request: {}.", request);
215		let request = read_request(request);
216		let result = match request {
217			Err(error) => Left(future::ready(Some(Response::from(
218				error,
219				self.compatibility.default_version(),
220			)))),
221			Ok(request) => Right(self.handle_rpc_request(request, meta)),
222		};
223
224		result.map(as_string)
225	}
226
227	/// Handle deserialized RPC request.
228	pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult<S::Future, S::CallFuture> {
229		use self::future::Either::{Left, Right};
230
231		fn output_as_response(output: Option<Output>) -> Option<Response> {
232			output.map(Response::Single)
233		}
234
235		fn outputs_as_batch(outs: Vec<Option<Output>>) -> Option<Response> {
236			let outs: Vec<_> = outs.into_iter().flatten().collect();
237			if outs.is_empty() {
238				None
239			} else {
240				Some(Response::Batch(outs))
241			}
242		}
243
244		self.middleware
245			.on_request(request, meta, |request, meta| match request {
246				Request::Single(call) => Left(
247					self.handle_call(call, meta)
248						.map(output_as_response as fn(Option<Output>) -> Option<Response>),
249				),
250				Request::Batch(calls) => {
251					let futures: Vec<_> = calls
252						.into_iter()
253						.map(move |call| self.handle_call(call, meta.clone()))
254						.collect();
255					Right(
256						future::join_all(futures).map(outputs_as_batch as fn(Vec<Option<Output>>) -> Option<Response>),
257					)
258				}
259			})
260	}
261
262	/// Handle single call asynchronously.
263	pub fn handle_call(&self, call: Call, meta: T) -> FutureRpcOutput<S::CallFuture> {
264		use self::future::Either::{Left, Right};
265
266		self.middleware.on_call(call, meta, |call, meta| match call {
267			Call::MethodCall(method) => {
268				let params = method.params;
269				let id = method.id;
270				let jsonrpc = method.jsonrpc;
271				let valid_version = self.compatibility.is_version_valid(jsonrpc);
272
273				let call_method = |method: &Arc<dyn RpcMethod<T>>| method.call(params, meta);
274
275				let result = match (valid_version, self.methods.get(&method.method)) {
276					(false, _) => Err(Error::invalid_version()),
277					(true, Some(&RemoteProcedure::Method(ref method))) => Ok(call_method(method)),
278					(true, Some(&RemoteProcedure::Alias(ref alias))) => match self.methods.get(alias) {
279						Some(&RemoteProcedure::Method(ref method)) => Ok(call_method(method)),
280						_ => Err(Error::method_not_found()),
281					},
282					(true, _) => Err(Error::method_not_found()),
283				};
284
285				match result {
286					Ok(result) => Left(Box::pin(
287						result.then(move |result| future::ready(Some(Output::from(result, id, jsonrpc)))),
288					) as _),
289					Err(err) => Right(future::ready(Some(Output::from(Err(err), id, jsonrpc)))),
290				}
291			}
292			Call::Notification(notification) => {
293				let params = notification.params;
294				let jsonrpc = notification.jsonrpc;
295				if !self.compatibility.is_version_valid(jsonrpc) {
296					return Right(future::ready(None));
297				}
298
299				match self.methods.get(&notification.method) {
300					Some(&RemoteProcedure::Notification(ref notification)) => {
301						notification.execute(params, meta);
302					}
303					Some(&RemoteProcedure::Alias(ref alias)) => {
304						if let Some(&RemoteProcedure::Notification(ref notification)) = self.methods.get(alias) {
305							notification.execute(params, meta);
306						}
307					}
308					_ => {}
309				}
310
311				Right(future::ready(None))
312			}
313			Call::Invalid { id } => Right(future::ready(Some(Output::invalid_request(
314				id,
315				self.compatibility.default_version(),
316			)))),
317		})
318	}
319
320	/// Returns an iterator visiting all methods in arbitrary order.
321	pub fn iter(&self) -> impl Iterator<Item = (&String, &RemoteProcedure<T>)> {
322		self.methods.iter()
323	}
324}
325
326/// A type that can augment `MetaIoHandler`.
327///
328/// This allows your code to accept generic extensions for `IoHandler`
329/// and compose them to create the RPC server.
330pub trait IoHandlerExtension<M: Metadata = ()> {
331	/// Extend given `handler` with additional methods.
332	fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>);
333}
334
335macro_rules! impl_io_handler_extension {
336	($( $x:ident, )*) => {
337		impl<M, $( $x, )*> IoHandlerExtension<M> for ($( $x, )*) where
338			M: Metadata,
339			$(
340				$x: IoHandlerExtension<M>,
341			)*
342			{
343				#[allow(unused)]
344				fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
345					#[allow(non_snake_case)]
346					let (
347						$( $x, )*
348					) = self;
349					$(
350						$x.augment(handler);
351					)*
352				}
353			}
354	}
355}
356
357impl_io_handler_extension!();
358impl_io_handler_extension!(A,);
359impl_io_handler_extension!(A, B,);
360impl_io_handler_extension!(A, B, C,);
361impl_io_handler_extension!(A, B, C, D,);
362impl_io_handler_extension!(A, B, C, D, E,);
363impl_io_handler_extension!(A, B, C, D, E, F,);
364impl_io_handler_extension!(A, B, C, D, E, F, G,);
365impl_io_handler_extension!(A, B, C, D, E, F, G, H,);
366impl_io_handler_extension!(A, B, C, D, E, F, G, H, I,);
367impl_io_handler_extension!(A, B, C, D, E, F, G, H, I, J,);
368impl_io_handler_extension!(A, B, C, D, E, F, G, H, I, J, K,);
369impl_io_handler_extension!(A, B, C, D, E, F, G, H, I, J, K, L,);
370
371impl<M: Metadata> IoHandlerExtension<M> for Vec<(String, RemoteProcedure<M>)> {
372	fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
373		handler.methods.extend(self)
374	}
375}
376
377impl<M: Metadata> IoHandlerExtension<M> for HashMap<String, RemoteProcedure<M>> {
378	fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
379		handler.methods.extend(self)
380	}
381}
382
383impl<M: Metadata, S2: Middleware<M>> IoHandlerExtension<M> for MetaIoHandler<M, S2> {
384	fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
385		handler.methods.extend(self.methods)
386	}
387}
388
389impl<M: Metadata, T: IoHandlerExtension<M>> IoHandlerExtension<M> for Option<T> {
390	fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
391		if let Some(x) = self {
392			x.augment(handler)
393		}
394	}
395}
396
397/// Simplified `IoHandler` with no `Metadata` associated with each request.
398#[derive(Clone, Debug, Default)]
399pub struct IoHandler<M: Metadata = ()>(MetaIoHandler<M>);
400
401impl<T: Metadata> IntoIterator for IoHandler<T> {
402	type Item = <MetaIoHandler<T> as IntoIterator>::Item;
403	type IntoIter = <MetaIoHandler<T> as IntoIterator>::IntoIter;
404
405	fn into_iter(self) -> Self::IntoIter {
406		self.0.into_iter()
407	}
408}
409
410// Type inference helper
411impl IoHandler {
412	/// Creates new `IoHandler` without any metadata.
413	pub fn new() -> Self {
414		IoHandler::default()
415	}
416
417	/// Creates new `IoHandler` without any metadata compatible with specified protocol version.
418	pub fn with_compatibility(compatibility: Compatibility) -> Self {
419		IoHandler(MetaIoHandler::with_compatibility(compatibility))
420	}
421}
422
423impl<M: Metadata + Default> IoHandler<M> {
424	/// Handle given string request asynchronously.
425	pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse, FutureOutput> {
426		self.0.handle_request(request, M::default())
427	}
428
429	/// Handle deserialized RPC request asynchronously.
430	pub fn handle_rpc_request(&self, request: Request) -> FutureRpcResult<FutureResponse, FutureOutput> {
431		self.0.handle_rpc_request(request, M::default())
432	}
433
434	/// Handle single Call asynchronously.
435	pub fn handle_call(&self, call: Call) -> FutureRpcOutput<FutureOutput> {
436		self.0.handle_call(call, M::default())
437	}
438
439	/// Handle given request synchronously - will block until response is available.
440	/// If you have any asynchronous methods in your RPC it is much wiser to use
441	/// `handle_request` instead and deal with asynchronous requests in a non-blocking fashion.
442	#[cfg(feature = "futures-executor")]
443	pub fn handle_request_sync(&self, request: &str) -> Option<String> {
444		self.0.handle_request_sync(request, M::default())
445	}
446}
447
448impl<M: Metadata> Deref for IoHandler<M> {
449	type Target = MetaIoHandler<M>;
450
451	fn deref(&self) -> &Self::Target {
452		&self.0
453	}
454}
455
456impl<M: Metadata> DerefMut for IoHandler<M> {
457	fn deref_mut(&mut self) -> &mut Self::Target {
458		&mut self.0
459	}
460}
461
462impl From<IoHandler> for MetaIoHandler<()> {
463	fn from(io: IoHandler) -> Self {
464		io.0
465	}
466}
467
468impl<M: Metadata> IoHandlerExtension<M> for IoHandler<M> {
469	fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
470		handler.methods.extend(self.0.methods)
471	}
472}
473
474fn read_request(request_str: &str) -> Result<Request, Error> {
475	crate::serde_from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError))
476}
477
478fn write_response(response: Response) -> String {
479	// this should never fail
480	serde_json::to_string(&response).unwrap()
481}
482
483#[cfg(test)]
484mod tests {
485	use super::{Compatibility, IoHandler};
486	use crate::types::Value;
487
488	#[test]
489	fn test_io_handler() {
490		let mut io = IoHandler::new();
491
492		io.add_method("say_hello", |_| async { Ok(Value::String("hello".to_string())) });
493
494		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
495		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
496
497		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
498	}
499
500	#[test]
501	fn test_io_handler_1dot0() {
502		let mut io = IoHandler::with_compatibility(Compatibility::Both);
503
504		io.add_method("say_hello", |_| async { Ok(Value::String("hello".to_string())) });
505
506		let request = r#"{"method": "say_hello", "params": [42, 23], "id": 1}"#;
507		let response = r#"{"result":"hello","id":1}"#;
508
509		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
510	}
511
512	#[test]
513	fn test_async_io_handler() {
514		let mut io = IoHandler::new();
515
516		io.add_method("say_hello", |_| async { Ok(Value::String("hello".to_string())) });
517
518		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
519		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
520
521		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
522	}
523
524	#[test]
525	fn test_notification() {
526		use std::sync::atomic;
527		use std::sync::Arc;
528
529		let mut io = IoHandler::new();
530
531		let called = Arc::new(atomic::AtomicBool::new(false));
532		let c = called.clone();
533		io.add_notification("say_hello", move |_| {
534			c.store(true, atomic::Ordering::SeqCst);
535		});
536		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}"#;
537
538		assert_eq!(io.handle_request_sync(request), None);
539		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
540	}
541
542	#[test]
543	fn test_method_not_found() {
544		let io = IoHandler::new();
545
546		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
547		let response = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#;
548
549		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
550	}
551
552	#[test]
553	fn test_method_alias() {
554		let mut io = IoHandler::new();
555		io.add_method("say_hello", |_| async { Ok(Value::String("hello".to_string())) });
556		io.add_alias("say_hello_alias", "say_hello");
557
558		let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23], "id": 1}"#;
559		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
560
561		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
562	}
563
564	#[test]
565	fn test_notification_alias() {
566		use std::sync::atomic;
567		use std::sync::Arc;
568
569		let mut io = IoHandler::new();
570
571		let called = Arc::new(atomic::AtomicBool::new(false));
572		let c = called.clone();
573		io.add_notification("say_hello", move |_| {
574			c.store(true, atomic::Ordering::SeqCst);
575		});
576		io.add_alias("say_hello_alias", "say_hello");
577
578		let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23]}"#;
579		assert_eq!(io.handle_request_sync(request), None);
580		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
581	}
582
583	#[test]
584	fn test_batch_notification() {
585		use std::sync::atomic;
586		use std::sync::Arc;
587
588		let mut io = IoHandler::new();
589
590		let called = Arc::new(atomic::AtomicBool::new(false));
591		let c = called.clone();
592		io.add_notification("say_hello", move |_| {
593			c.store(true, atomic::Ordering::SeqCst);
594		});
595
596		let request = r#"[{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}]"#;
597		assert_eq!(io.handle_request_sync(request), None);
598		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
599	}
600
601	#[test]
602	fn test_send_sync() {
603		fn is_send_sync<T>(_obj: T) -> bool
604		where
605			T: Send + Sync,
606		{
607			true
608		}
609
610		let io = IoHandler::new();
611
612		assert!(is_send_sync(io))
613	}
614
615	#[test]
616	fn test_extending_by_multiple_delegates() {
617		use super::IoHandlerExtension;
618		use crate::delegates::IoDelegate;
619		use std::sync::Arc;
620
621		struct Test;
622		impl Test {
623			fn abc(&self, _p: crate::Params) -> crate::BoxFuture<crate::Result<Value>> {
624				Box::pin(async { Ok(5.into()) })
625			}
626		}
627
628		let mut io = IoHandler::new();
629		let mut del1 = IoDelegate::new(Arc::new(Test));
630		del1.add_method("rpc_test", Test::abc);
631		let mut del2 = IoDelegate::new(Arc::new(Test));
632		del2.add_method("rpc_test", Test::abc);
633
634		fn augment<X: IoHandlerExtension>(x: X, io: &mut IoHandler) {
635			x.augment(io);
636		}
637
638		augment((del1, del2), &mut io);
639	}
640}