jsonrpc_pubsub/
handler.rs1use crate::core;
2use crate::core::futures::Future;
3
4use crate::subscription::{new_subscription, Subscriber};
5use crate::types::{PubSubMetadata, SubscriptionId};
6
7pub trait SubscribeRpcMethod<M: PubSubMetadata>: Send + Sync + 'static {
9 fn call(&self, params: core::Params, meta: M, subscriber: Subscriber);
11}
12
13impl<M, F> SubscribeRpcMethod<M> for F
14where
15 F: Fn(core::Params, M, Subscriber) + Send + Sync + 'static,
16 M: PubSubMetadata,
17{
18 fn call(&self, params: core::Params, meta: M, subscriber: Subscriber) {
19 (*self)(params, meta, subscriber)
20 }
21}
22
23pub trait UnsubscribeRpcMethod<M>: Send + Sync + 'static {
25 type Out: Future<Output = core::Result<core::Value>> + Send + 'static;
27 fn call(&self, id: SubscriptionId, meta: Option<M>) -> Self::Out;
31}
32
33impl<M, F, I> UnsubscribeRpcMethod<M> for F
34where
35 F: Fn(SubscriptionId, Option<M>) -> I + Send + Sync + 'static,
36 I: Future<Output = core::Result<core::Value>> + Send + 'static,
37{
38 type Out = I;
39 fn call(&self, id: SubscriptionId, meta: Option<M>) -> Self::Out {
40 (*self)(id, meta)
41 }
42}
43
44pub struct PubSubHandler<T: PubSubMetadata, S: core::Middleware<T> = core::middleware::Noop> {
46 handler: core::MetaIoHandler<T, S>,
47}
48
49impl<T: PubSubMetadata> Default for PubSubHandler<T> {
50 fn default() -> Self {
51 PubSubHandler {
52 handler: Default::default(),
53 }
54 }
55}
56
57impl<T: PubSubMetadata, S: core::Middleware<T>> PubSubHandler<T, S> {
58 pub fn new(handler: core::MetaIoHandler<T, S>) -> Self {
60 PubSubHandler { handler }
61 }
62
63 pub fn add_subscription<F, G>(&mut self, notification: &str, subscribe: (&str, F), unsubscribe: (&str, G))
65 where
66 F: SubscribeRpcMethod<T>,
67 G: UnsubscribeRpcMethod<T>,
68 {
69 let (sub, unsub) = new_subscription(notification, subscribe.1, unsubscribe.1);
70 self.handler.add_method_with_meta(subscribe.0, sub);
71 self.handler.add_method_with_meta(unsubscribe.0, unsub);
72 }
73}
74
75impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::Deref for PubSubHandler<T, S> {
76 type Target = core::MetaIoHandler<T, S>;
77
78 fn deref(&self) -> &Self::Target {
79 &self.handler
80 }
81}
82
83impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::DerefMut for PubSubHandler<T, S> {
84 fn deref_mut(&mut self) -> &mut Self::Target {
85 &mut self.handler
86 }
87}
88
89impl<T: PubSubMetadata, S: core::Middleware<T>> Into<core::MetaIoHandler<T, S>> for PubSubHandler<T, S> {
90 fn into(self) -> core::MetaIoHandler<T, S> {
91 self.handler
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use std::sync::atomic::{AtomicBool, Ordering};
98 use std::sync::Arc;
99
100 use crate::core;
101 use crate::core::futures::channel::mpsc;
102 use crate::core::futures::future;
103 use crate::subscription::{Session, Subscriber};
104 use crate::types::{PubSubMetadata, SubscriptionId};
105
106 use super::PubSubHandler;
107
108 #[derive(Clone)]
109 struct Metadata(Arc<Session>);
110 impl core::Metadata for Metadata {}
111 impl PubSubMetadata for Metadata {
112 fn session(&self) -> Option<Arc<Session>> {
113 Some(self.0.clone())
114 }
115 }
116
117 #[test]
118 fn should_handle_subscription() {
119 let mut handler = PubSubHandler::default();
121 let called = Arc::new(AtomicBool::new(false));
122 let called2 = called.clone();
123 handler.add_subscription(
124 "hello",
125 ("subscribe_hello", |params, _meta, subscriber: Subscriber| {
126 assert_eq!(params, core::Params::None);
127 let _sink = subscriber.assign_id(SubscriptionId::Number(5));
128 }),
129 ("unsubscribe_hello", move |id, _meta| {
130 called2.store(true, Ordering::SeqCst);
132 assert_eq!(id, SubscriptionId::Number(5));
133 future::ok(core::Value::Bool(true))
134 }),
135 );
136
137 let (tx, _rx) = mpsc::unbounded();
139 let meta = Metadata(Arc::new(Session::new(tx)));
140 let req = r#"{"jsonrpc":"2.0","id":1,"method":"subscribe_hello","params":null}"#;
141 let res = handler.handle_request_sync(req, meta);
142
143 let response = r#"{"jsonrpc":"2.0","result":5,"id":1}"#;
145 assert_eq!(res, Some(response.into()));
146 assert_eq!(called.load(Ordering::SeqCst), true);
147 }
148}