1use std::collections::{
2 hash_map::{IntoIter, Iter},
3 HashMap,
4};
5use std::future::Future;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::sync::Arc;
9
10use futures_util::{self, future, FutureExt};
11
12use crate::calls::{
13 Metadata, RemoteProcedure, RpcMethod, RpcMethodSimple, RpcMethodSync, RpcNotification, RpcNotificationSimple,
14};
15use crate::middleware::{self, Middleware};
16use crate::types::{Call, Output, Request, Response};
17use crate::types::{Error, ErrorCode, Version};
18
19pub type FutureResponse = Pin<Box<dyn Future<Output = Option<Response>> + Send>>;
21
22pub type FutureOutput = Pin<Box<dyn Future<Output = Option<Output>> + Send>>;
24
25pub type FutureResult<F, G> = future::Map<
27 future::Either<future::Ready<Option<Response>>, FutureRpcResult<F, G>>,
28 fn(Option<Response>) -> Option<String>,
29>;
30
31pub type FutureRpcOutput<F> = future::Either<F, future::Either<FutureOutput, future::Ready<Option<Output>>>>;
33
34pub type FutureRpcResult<F, G> = future::Either<
36 F,
37 future::Either<
38 future::Map<FutureRpcOutput<G>, fn(Option<Output>) -> Option<Response>>,
39 future::Map<future::JoinAll<FutureRpcOutput<G>>, fn(Vec<Option<Output>>) -> Option<Response>>,
40 >,
41>;
42
43#[derive(Debug, Clone, Copy)]
45pub enum Compatibility {
46 V1,
48 V2,
50 Both,
52}
53
54impl Default for Compatibility {
55 fn default() -> Self {
56 Compatibility::V2
57 }
58}
59
60impl Compatibility {
61 fn is_version_valid(self, version: Option<Version>) -> bool {
62 matches!(
63 (self, version),
64 (Compatibility::V1, None) | (Compatibility::V2, Some(Version::V2)) | (Compatibility::Both, _)
65 )
66 }
67
68 fn default_version(self) -> Option<Version> {
69 match self {
70 Compatibility::V1 => None,
71 Compatibility::V2 | Compatibility::Both => Some(Version::V2),
72 }
73 }
74}
75
76#[derive(Clone, Debug)]
80pub struct MetaIoHandler<T: Metadata, S: Middleware<T> = middleware::Noop> {
81 middleware: S,
82 compatibility: Compatibility,
83 methods: HashMap<String, RemoteProcedure<T>>,
84}
85
86impl<T: Metadata> Default for MetaIoHandler<T> {
87 fn default() -> Self {
88 MetaIoHandler::with_compatibility(Default::default())
89 }
90}
91
92impl<T: Metadata, S: Middleware<T>> IntoIterator for MetaIoHandler<T, S> {
93 type Item = (String, RemoteProcedure<T>);
94 type IntoIter = IntoIter<String, RemoteProcedure<T>>;
95
96 fn into_iter(self) -> Self::IntoIter {
97 self.methods.into_iter()
98 }
99}
100
101impl<'a, T: Metadata, S: Middleware<T>> IntoIterator for &'a MetaIoHandler<T, S> {
102 type Item = (&'a String, &'a RemoteProcedure<T>);
103 type IntoIter = Iter<'a, String, RemoteProcedure<T>>;
104
105 fn into_iter(self) -> Self::IntoIter {
106 self.methods.iter()
107 }
108}
109
110impl<T: Metadata> MetaIoHandler<T> {
111 pub fn with_compatibility(compatibility: Compatibility) -> Self {
113 MetaIoHandler {
114 compatibility,
115 middleware: Default::default(),
116 methods: Default::default(),
117 }
118 }
119}
120
121impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
122 pub fn new(compatibility: Compatibility, middleware: S) -> Self {
124 MetaIoHandler {
125 compatibility,
126 middleware,
127 methods: Default::default(),
128 }
129 }
130
131 pub fn with_middleware(middleware: S) -> Self {
133 MetaIoHandler {
134 compatibility: Default::default(),
135 middleware,
136 methods: Default::default(),
137 }
138 }
139
140 pub fn add_alias(&mut self, alias: &str, other: &str) {
142 self.methods.insert(alias.into(), RemoteProcedure::Alias(other.into()));
143 }
144
145 pub fn add_sync_method<F>(&mut self, name: &str, method: F)
149 where
150 F: RpcMethodSync,
151 {
152 self.add_method(name, move |params| method.call(params))
153 }
154
155 pub fn add_method<F>(&mut self, name: &str, method: F)
157 where
158 F: RpcMethodSimple,
159 {
160 self.add_method_with_meta(name, move |params, _meta| method.call(params))
161 }
162
163 pub fn add_notification<F>(&mut self, name: &str, notification: F)
165 where
166 F: RpcNotificationSimple,
167 {
168 self.add_notification_with_meta(name, move |params, _meta| notification.execute(params))
169 }
170
171 pub fn add_method_with_meta<F>(&mut self, name: &str, method: F)
173 where
174 F: RpcMethod<T>,
175 {
176 self.methods
177 .insert(name.into(), RemoteProcedure::Method(Arc::new(method)));
178 }
179
180 pub fn add_notification_with_meta<F>(&mut self, name: &str, notification: F)
182 where
183 F: RpcNotification<T>,
184 {
185 self.methods
186 .insert(name.into(), RemoteProcedure::Notification(Arc::new(notification)));
187 }
188
189 pub fn extend_with<F>(&mut self, methods: F)
191 where
192 F: IntoIterator<Item = (String, RemoteProcedure<T>)>,
193 {
194 self.methods.extend(methods)
195 }
196
197 #[cfg(feature = "futures-executor")]
201 pub fn handle_request_sync(&self, request: &str, meta: T) -> Option<String> {
202 futures_executor::block_on(self.handle_request(request, meta))
203 }
204
205 pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future, S::CallFuture> {
207 use self::future::Either::{Left, Right};
208 fn as_string(response: Option<Response>) -> Option<String> {
209 let res = response.map(write_response);
210 debug!(target: "rpc", "Response: {}.", res.as_ref().unwrap_or(&"None".to_string()));
211 res
212 }
213
214 trace!(target: "rpc", "Request: {}.", request);
215 let request = read_request(request);
216 let result = match request {
217 Err(error) => Left(future::ready(Some(Response::from(
218 error,
219 self.compatibility.default_version(),
220 )))),
221 Ok(request) => Right(self.handle_rpc_request(request, meta)),
222 };
223
224 result.map(as_string)
225 }
226
227 pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult<S::Future, S::CallFuture> {
229 use self::future::Either::{Left, Right};
230
231 fn output_as_response(output: Option<Output>) -> Option<Response> {
232 output.map(Response::Single)
233 }
234
235 fn outputs_as_batch(outs: Vec<Option<Output>>) -> Option<Response> {
236 let outs: Vec<_> = outs.into_iter().flatten().collect();
237 if outs.is_empty() {
238 None
239 } else {
240 Some(Response::Batch(outs))
241 }
242 }
243
244 self.middleware
245 .on_request(request, meta, |request, meta| match request {
246 Request::Single(call) => Left(
247 self.handle_call(call, meta)
248 .map(output_as_response as fn(Option<Output>) -> Option<Response>),
249 ),
250 Request::Batch(calls) => {
251 let futures: Vec<_> = calls
252 .into_iter()
253 .map(move |call| self.handle_call(call, meta.clone()))
254 .collect();
255 Right(
256 future::join_all(futures).map(outputs_as_batch as fn(Vec<Option<Output>>) -> Option<Response>),
257 )
258 }
259 })
260 }
261
262 pub fn handle_call(&self, call: Call, meta: T) -> FutureRpcOutput<S::CallFuture> {
264 use self::future::Either::{Left, Right};
265
266 self.middleware.on_call(call, meta, |call, meta| match call {
267 Call::MethodCall(method) => {
268 let params = method.params;
269 let id = method.id;
270 let jsonrpc = method.jsonrpc;
271 let valid_version = self.compatibility.is_version_valid(jsonrpc);
272
273 let call_method = |method: &Arc<dyn RpcMethod<T>>| method.call(params, meta);
274
275 let result = match (valid_version, self.methods.get(&method.method)) {
276 (false, _) => Err(Error::invalid_version()),
277 (true, Some(&RemoteProcedure::Method(ref method))) => Ok(call_method(method)),
278 (true, Some(&RemoteProcedure::Alias(ref alias))) => match self.methods.get(alias) {
279 Some(&RemoteProcedure::Method(ref method)) => Ok(call_method(method)),
280 _ => Err(Error::method_not_found()),
281 },
282 (true, _) => Err(Error::method_not_found()),
283 };
284
285 match result {
286 Ok(result) => Left(Box::pin(
287 result.then(move |result| future::ready(Some(Output::from(result, id, jsonrpc)))),
288 ) as _),
289 Err(err) => Right(future::ready(Some(Output::from(Err(err), id, jsonrpc)))),
290 }
291 }
292 Call::Notification(notification) => {
293 let params = notification.params;
294 let jsonrpc = notification.jsonrpc;
295 if !self.compatibility.is_version_valid(jsonrpc) {
296 return Right(future::ready(None));
297 }
298
299 match self.methods.get(¬ification.method) {
300 Some(&RemoteProcedure::Notification(ref notification)) => {
301 notification.execute(params, meta);
302 }
303 Some(&RemoteProcedure::Alias(ref alias)) => {
304 if let Some(&RemoteProcedure::Notification(ref notification)) = self.methods.get(alias) {
305 notification.execute(params, meta);
306 }
307 }
308 _ => {}
309 }
310
311 Right(future::ready(None))
312 }
313 Call::Invalid { id } => Right(future::ready(Some(Output::invalid_request(
314 id,
315 self.compatibility.default_version(),
316 )))),
317 })
318 }
319
320 pub fn iter(&self) -> impl Iterator<Item = (&String, &RemoteProcedure<T>)> {
322 self.methods.iter()
323 }
324}
325
326pub trait IoHandlerExtension<M: Metadata = ()> {
331 fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>);
333}
334
335macro_rules! impl_io_handler_extension {
336 ($( $x:ident, )*) => {
337 impl<M, $( $x, )*> IoHandlerExtension<M> for ($( $x, )*) where
338 M: Metadata,
339 $(
340 $x: IoHandlerExtension<M>,
341 )*
342 {
343 #[allow(unused)]
344 fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
345 #[allow(non_snake_case)]
346 let (
347 $( $x, )*
348 ) = self;
349 $(
350 $x.augment(handler);
351 )*
352 }
353 }
354 }
355}
356
357impl_io_handler_extension!();
358impl_io_handler_extension!(A,);
359impl_io_handler_extension!(A, B,);
360impl_io_handler_extension!(A, B, C,);
361impl_io_handler_extension!(A, B, C, D,);
362impl_io_handler_extension!(A, B, C, D, E,);
363impl_io_handler_extension!(A, B, C, D, E, F,);
364impl_io_handler_extension!(A, B, C, D, E, F, G,);
365impl_io_handler_extension!(A, B, C, D, E, F, G, H,);
366impl_io_handler_extension!(A, B, C, D, E, F, G, H, I,);
367impl_io_handler_extension!(A, B, C, D, E, F, G, H, I, J,);
368impl_io_handler_extension!(A, B, C, D, E, F, G, H, I, J, K,);
369impl_io_handler_extension!(A, B, C, D, E, F, G, H, I, J, K, L,);
370
371impl<M: Metadata> IoHandlerExtension<M> for Vec<(String, RemoteProcedure<M>)> {
372 fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
373 handler.methods.extend(self)
374 }
375}
376
377impl<M: Metadata> IoHandlerExtension<M> for HashMap<String, RemoteProcedure<M>> {
378 fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
379 handler.methods.extend(self)
380 }
381}
382
383impl<M: Metadata, S2: Middleware<M>> IoHandlerExtension<M> for MetaIoHandler<M, S2> {
384 fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
385 handler.methods.extend(self.methods)
386 }
387}
388
389impl<M: Metadata, T: IoHandlerExtension<M>> IoHandlerExtension<M> for Option<T> {
390 fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
391 if let Some(x) = self {
392 x.augment(handler)
393 }
394 }
395}
396
397#[derive(Clone, Debug, Default)]
399pub struct IoHandler<M: Metadata = ()>(MetaIoHandler<M>);
400
401impl<T: Metadata> IntoIterator for IoHandler<T> {
402 type Item = <MetaIoHandler<T> as IntoIterator>::Item;
403 type IntoIter = <MetaIoHandler<T> as IntoIterator>::IntoIter;
404
405 fn into_iter(self) -> Self::IntoIter {
406 self.0.into_iter()
407 }
408}
409
410impl IoHandler {
412 pub fn new() -> Self {
414 IoHandler::default()
415 }
416
417 pub fn with_compatibility(compatibility: Compatibility) -> Self {
419 IoHandler(MetaIoHandler::with_compatibility(compatibility))
420 }
421}
422
423impl<M: Metadata + Default> IoHandler<M> {
424 pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse, FutureOutput> {
426 self.0.handle_request(request, M::default())
427 }
428
429 pub fn handle_rpc_request(&self, request: Request) -> FutureRpcResult<FutureResponse, FutureOutput> {
431 self.0.handle_rpc_request(request, M::default())
432 }
433
434 pub fn handle_call(&self, call: Call) -> FutureRpcOutput<FutureOutput> {
436 self.0.handle_call(call, M::default())
437 }
438
439 #[cfg(feature = "futures-executor")]
443 pub fn handle_request_sync(&self, request: &str) -> Option<String> {
444 self.0.handle_request_sync(request, M::default())
445 }
446}
447
448impl<M: Metadata> Deref for IoHandler<M> {
449 type Target = MetaIoHandler<M>;
450
451 fn deref(&self) -> &Self::Target {
452 &self.0
453 }
454}
455
456impl<M: Metadata> DerefMut for IoHandler<M> {
457 fn deref_mut(&mut self) -> &mut Self::Target {
458 &mut self.0
459 }
460}
461
462impl From<IoHandler> for MetaIoHandler<()> {
463 fn from(io: IoHandler) -> Self {
464 io.0
465 }
466}
467
468impl<M: Metadata> IoHandlerExtension<M> for IoHandler<M> {
469 fn augment<S: Middleware<M>>(self, handler: &mut MetaIoHandler<M, S>) {
470 handler.methods.extend(self.0.methods)
471 }
472}
473
474fn read_request(request_str: &str) -> Result<Request, Error> {
475 crate::serde_from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError))
476}
477
478fn write_response(response: Response) -> String {
479 serde_json::to_string(&response).unwrap()
481}
482
483#[cfg(test)]
484mod tests {
485 use super::{Compatibility, IoHandler};
486 use crate::types::Value;
487
488 #[test]
489 fn test_io_handler() {
490 let mut io = IoHandler::new();
491
492 io.add_method("say_hello", |_| async { Ok(Value::String("hello".to_string())) });
493
494 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
495 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
496
497 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
498 }
499
500 #[test]
501 fn test_io_handler_1dot0() {
502 let mut io = IoHandler::with_compatibility(Compatibility::Both);
503
504 io.add_method("say_hello", |_| async { Ok(Value::String("hello".to_string())) });
505
506 let request = r#"{"method": "say_hello", "params": [42, 23], "id": 1}"#;
507 let response = r#"{"result":"hello","id":1}"#;
508
509 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
510 }
511
512 #[test]
513 fn test_async_io_handler() {
514 let mut io = IoHandler::new();
515
516 io.add_method("say_hello", |_| async { Ok(Value::String("hello".to_string())) });
517
518 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
519 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
520
521 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
522 }
523
524 #[test]
525 fn test_notification() {
526 use std::sync::atomic;
527 use std::sync::Arc;
528
529 let mut io = IoHandler::new();
530
531 let called = Arc::new(atomic::AtomicBool::new(false));
532 let c = called.clone();
533 io.add_notification("say_hello", move |_| {
534 c.store(true, atomic::Ordering::SeqCst);
535 });
536 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}"#;
537
538 assert_eq!(io.handle_request_sync(request), None);
539 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
540 }
541
542 #[test]
543 fn test_method_not_found() {
544 let io = IoHandler::new();
545
546 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
547 let response = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#;
548
549 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
550 }
551
552 #[test]
553 fn test_method_alias() {
554 let mut io = IoHandler::new();
555 io.add_method("say_hello", |_| async { Ok(Value::String("hello".to_string())) });
556 io.add_alias("say_hello_alias", "say_hello");
557
558 let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23], "id": 1}"#;
559 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
560
561 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
562 }
563
564 #[test]
565 fn test_notification_alias() {
566 use std::sync::atomic;
567 use std::sync::Arc;
568
569 let mut io = IoHandler::new();
570
571 let called = Arc::new(atomic::AtomicBool::new(false));
572 let c = called.clone();
573 io.add_notification("say_hello", move |_| {
574 c.store(true, atomic::Ordering::SeqCst);
575 });
576 io.add_alias("say_hello_alias", "say_hello");
577
578 let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23]}"#;
579 assert_eq!(io.handle_request_sync(request), None);
580 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
581 }
582
583 #[test]
584 fn test_batch_notification() {
585 use std::sync::atomic;
586 use std::sync::Arc;
587
588 let mut io = IoHandler::new();
589
590 let called = Arc::new(atomic::AtomicBool::new(false));
591 let c = called.clone();
592 io.add_notification("say_hello", move |_| {
593 c.store(true, atomic::Ordering::SeqCst);
594 });
595
596 let request = r#"[{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}]"#;
597 assert_eq!(io.handle_request_sync(request), None);
598 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
599 }
600
601 #[test]
602 fn test_send_sync() {
603 fn is_send_sync<T>(_obj: T) -> bool
604 where
605 T: Send + Sync,
606 {
607 true
608 }
609
610 let io = IoHandler::new();
611
612 assert!(is_send_sync(io))
613 }
614
615 #[test]
616 fn test_extending_by_multiple_delegates() {
617 use super::IoHandlerExtension;
618 use crate::delegates::IoDelegate;
619 use std::sync::Arc;
620
621 struct Test;
622 impl Test {
623 fn abc(&self, _p: crate::Params) -> crate::BoxFuture<crate::Result<Value>> {
624 Box::pin(async { Ok(5.into()) })
625 }
626 }
627
628 let mut io = IoHandler::new();
629 let mut del1 = IoDelegate::new(Arc::new(Test));
630 del1.add_method("rpc_test", Test::abc);
631 let mut del2 = IoDelegate::new(Arc::new(Test));
632 del2.add_method("rpc_test", Test::abc);
633
634 fn augment<X: IoHandlerExtension>(x: X, io: &mut IoHandler) {
635 x.augment(io);
636 }
637
638 augment((del1, del2), &mut io);
639 }
640}