1#![deny(missing_docs)]
4
5use jsonrpc_core::futures::channel::{mpsc, oneshot};
6use jsonrpc_core::futures::{
7 self,
8 task::{Context, Poll},
9 Future, Stream, StreamExt,
10};
11use jsonrpc_core::{Error, Params};
12use serde::de::DeserializeOwned;
13use serde::Serialize;
14use serde_json::Value;
15use std::marker::PhantomData;
16use std::pin::Pin;
17
18pub mod transports;
19
20#[cfg(test)]
21mod logger;
22
23#[derive(Debug, derive_more::Display)]
25pub enum RpcError {
26 #[display(fmt = "Server returned rpc error {}", _0)]
28 JsonRpcError(Error),
29 #[display(fmt = "Failed to parse server response as {}: {}", _0, _1)]
31 ParseError(String, Box<dyn std::error::Error + Send>),
32 #[display(fmt = "Request timed out")]
34 Timeout,
35 #[display(fmt = "Client error: {}", _0)]
37 Client(String),
38 #[display(fmt = "{}", _0)]
40 Other(Box<dyn std::error::Error + Send>),
41}
42
43impl std::error::Error for RpcError {
44 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
45 match *self {
46 Self::JsonRpcError(ref e) => Some(e),
47 Self::ParseError(_, ref e) => Some(&**e),
48 Self::Other(ref e) => Some(&**e),
49 _ => None,
50 }
51 }
52}
53
54impl From<Error> for RpcError {
55 fn from(error: Error) -> Self {
56 RpcError::JsonRpcError(error)
57 }
58}
59
60pub type RpcResult<T> = Result<T, RpcError>;
62
63struct CallMessage {
65 method: String,
67 params: Params,
69 sender: oneshot::Sender<RpcResult<Value>>,
72}
73
74struct NotifyMessage {
76 method: String,
78 params: Params,
80}
81
82struct Subscription {
84 subscribe: String,
86 subscribe_params: Params,
88 notification: String,
90 unsubscribe: String,
92}
93
94struct SubscribeMessage {
96 subscription: Subscription,
98 sender: mpsc::UnboundedSender<RpcResult<Value>>,
100}
101
102enum RpcMessage {
104 Call(CallMessage),
106 Notify(NotifyMessage),
108 Subscribe(SubscribeMessage),
110}
111
112impl From<CallMessage> for RpcMessage {
113 fn from(msg: CallMessage) -> Self {
114 RpcMessage::Call(msg)
115 }
116}
117
118impl From<NotifyMessage> for RpcMessage {
119 fn from(msg: NotifyMessage) -> Self {
120 RpcMessage::Notify(msg)
121 }
122}
123
124impl From<SubscribeMessage> for RpcMessage {
125 fn from(msg: SubscribeMessage) -> Self {
126 RpcMessage::Subscribe(msg)
127 }
128}
129
130#[derive(Clone)]
132pub struct RpcChannel(mpsc::UnboundedSender<RpcMessage>);
133
134impl RpcChannel {
135 fn send(&self, msg: RpcMessage) -> Result<(), mpsc::TrySendError<RpcMessage>> {
136 self.0.unbounded_send(msg)
137 }
138}
139
140impl From<mpsc::UnboundedSender<RpcMessage>> for RpcChannel {
141 fn from(sender: mpsc::UnboundedSender<RpcMessage>) -> Self {
142 RpcChannel(sender)
143 }
144}
145
146pub type RpcFuture = oneshot::Receiver<Result<Value, RpcError>>;
148
149pub type SubscriptionStream = mpsc::UnboundedReceiver<Result<Value, RpcError>>;
151
152pub struct TypedSubscriptionStream<T> {
154 _marker: PhantomData<T>,
155 returns: &'static str,
156 stream: SubscriptionStream,
157}
158
159impl<T> TypedSubscriptionStream<T> {
160 pub fn new(stream: SubscriptionStream, returns: &'static str) -> Self {
162 TypedSubscriptionStream {
163 _marker: PhantomData,
164 returns,
165 stream,
166 }
167 }
168}
169
170impl<T: DeserializeOwned + Unpin + 'static> Stream for TypedSubscriptionStream<T> {
171 type Item = RpcResult<T>;
172
173 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
174 let result = futures::ready!(self.stream.poll_next_unpin(cx));
175 match result {
176 Some(Ok(value)) => Some(
177 serde_json::from_value::<T>(value)
178 .map_err(|error| RpcError::ParseError(self.returns.into(), Box::new(error))),
179 ),
180 None => None,
181 Some(Err(err)) => Some(Err(err)),
182 }
183 .into()
184 }
185}
186
187#[derive(Clone)]
189pub struct RawClient(RpcChannel);
190
191impl From<RpcChannel> for RawClient {
192 fn from(channel: RpcChannel) -> Self {
193 RawClient(channel)
194 }
195}
196
197impl RawClient {
198 pub fn call_method(&self, method: &str, params: Params) -> impl Future<Output = RpcResult<Value>> {
200 let (sender, receiver) = oneshot::channel();
201 let msg = CallMessage {
202 method: method.into(),
203 params,
204 sender,
205 };
206 let result = self.0.send(msg.into());
207 async move {
208 let () = result.map_err(|e| RpcError::Other(Box::new(e)))?;
209
210 receiver.await.map_err(|e| RpcError::Other(Box::new(e)))?
211 }
212 }
213
214 pub fn notify(&self, method: &str, params: Params) -> RpcResult<()> {
216 let msg = NotifyMessage {
217 method: method.into(),
218 params,
219 };
220 match self.0.send(msg.into()) {
221 Ok(()) => Ok(()),
222 Err(error) => Err(RpcError::Other(Box::new(error))),
223 }
224 }
225
226 pub fn subscribe(
228 &self,
229 subscribe: &str,
230 subscribe_params: Params,
231 notification: &str,
232 unsubscribe: &str,
233 ) -> RpcResult<SubscriptionStream> {
234 let (sender, receiver) = mpsc::unbounded();
235 let msg = SubscribeMessage {
236 subscription: Subscription {
237 subscribe: subscribe.into(),
238 subscribe_params,
239 notification: notification.into(),
240 unsubscribe: unsubscribe.into(),
241 },
242 sender,
243 };
244
245 self.0
246 .send(msg.into())
247 .map(|()| receiver)
248 .map_err(|e| RpcError::Other(Box::new(e)))
249 }
250}
251
252#[derive(Clone)]
254pub struct TypedClient(RawClient);
255
256impl From<RpcChannel> for TypedClient {
257 fn from(channel: RpcChannel) -> Self {
258 TypedClient(channel.into())
259 }
260}
261
262impl TypedClient {
263 pub fn new(raw_cli: RawClient) -> Self {
265 TypedClient(raw_cli)
266 }
267
268 pub fn call_method<T: Serialize, R: DeserializeOwned>(
270 &self,
271 method: &str,
272 returns: &str,
273 args: T,
274 ) -> impl Future<Output = RpcResult<R>> {
275 let returns = returns.to_owned();
276 let args =
277 serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
278 let params = match args {
279 Value::Array(vec) => Ok(Params::Array(vec)),
280 Value::Null => Ok(Params::None),
281 Value::Object(map) => Ok(Params::Map(map)),
282 _ => Err(RpcError::Client(
283 "RPC params should serialize to a JSON array, JSON object or null".into(),
284 )),
285 };
286 let result = params.map(|params| self.0.call_method(method, params));
287
288 async move {
289 let value: Value = result?.await?;
290
291 log::debug!("response: {:?}", value);
292
293 serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns, Box::new(error)))
294 }
295 }
296
297 pub fn notify<T: Serialize>(&self, method: &str, args: T) -> RpcResult<()> {
299 let args =
300 serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
301 let params = match args {
302 Value::Array(vec) => Params::Array(vec),
303 Value::Null => Params::None,
304 _ => {
305 return Err(RpcError::Client(
306 "RPC params should serialize to a JSON array, or null".into(),
307 ))
308 }
309 };
310
311 self.0.notify(method, params)
312 }
313
314 pub fn subscribe<T: Serialize, R: DeserializeOwned + 'static>(
316 &self,
317 subscribe: &str,
318 subscribe_params: T,
319 topic: &str,
320 unsubscribe: &str,
321 returns: &'static str,
322 ) -> RpcResult<TypedSubscriptionStream<R>> {
323 let args = serde_json::to_value(subscribe_params)
324 .expect("Only types with infallible serialisation can be used for JSON-RPC");
325
326 let params = match args {
327 Value::Array(vec) => Params::Array(vec),
328 Value::Null => Params::None,
329 _ => {
330 return Err(RpcError::Client(
331 "RPC params should serialize to a JSON array, or null".into(),
332 ))
333 }
334 };
335
336 self.0
337 .subscribe(subscribe, params, topic, unsubscribe)
338 .map(move |stream| TypedSubscriptionStream::new(stream, returns))
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::transports::local;
346 use crate::{RpcChannel, TypedClient};
347 use jsonrpc_core::futures::{future, FutureExt};
348 use jsonrpc_core::{self as core, IoHandler};
349 use jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId};
350 use std::sync::atomic::{AtomicBool, Ordering};
351 use std::sync::Arc;
352
353 #[derive(Clone)]
354 struct AddClient(TypedClient);
355
356 impl From<RpcChannel> for AddClient {
357 fn from(channel: RpcChannel) -> Self {
358 AddClient(channel.into())
359 }
360 }
361
362 impl AddClient {
363 fn add(&self, a: u64, b: u64) -> impl Future<Output = RpcResult<u64>> {
364 self.0.call_method("add", "u64", (a, b))
365 }
366
367 fn completed(&self, success: bool) -> RpcResult<()> {
368 self.0.notify("completed", (success,))
369 }
370 }
371
372 #[test]
373 fn test_client_terminates() {
374 crate::logger::init_log();
375 let mut handler = IoHandler::new();
376 handler.add_sync_method("add", |params: Params| {
377 let (a, b) = params.parse::<(u64, u64)>()?;
378 let res = a + b;
379 Ok(jsonrpc_core::to_value(res).unwrap())
380 });
381
382 let (tx, rx) = std::sync::mpsc::channel();
383 let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
384 let fut = async move {
385 let res = client.add(3, 4).await?;
386 let res = client.add(res, 5).await?;
387 assert_eq!(res, 12);
388 tx.send(()).unwrap();
389 Ok(()) as RpcResult<_>
390 };
391 let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
392 pool.spawn_ok(rpc_client.map(|x| x.unwrap()));
393 pool.spawn_ok(fut.map(|x| x.unwrap()));
394 rx.recv().unwrap()
395 }
396
397 #[test]
398 fn should_send_notification() {
399 crate::logger::init_log();
400 let (tx, rx) = std::sync::mpsc::sync_channel(1);
401 let mut handler = IoHandler::new();
402 handler.add_notification("completed", move |params: Params| {
403 let (success,) = params.parse::<(bool,)>().expect("expected to receive one boolean");
404 assert_eq!(success, true);
405 tx.send(()).unwrap();
406 });
407
408 let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
409 client.completed(true).unwrap();
410 let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
411 pool.spawn_ok(rpc_client.map(|x| x.unwrap()));
412 rx.recv().unwrap()
413 }
414
415 #[test]
416 fn should_handle_subscription() {
417 crate::logger::init_log();
418 let (finish, finished) = std::sync::mpsc::sync_channel(1);
420 let mut handler = PubSubHandler::<local::LocalMeta, _>::default();
421 let called = Arc::new(AtomicBool::new(false));
422 let called2 = called.clone();
423 handler.add_subscription(
424 "hello",
425 ("subscribe_hello", move |params, _meta, subscriber: Subscriber| {
426 assert_eq!(params, core::Params::None);
427 let sink = subscriber
428 .assign_id(SubscriptionId::Number(5))
429 .expect("assigned subscription id");
430 let finish = finish.clone();
431 std::thread::spawn(move || {
432 for i in 0..3 {
433 std::thread::sleep(std::time::Duration::from_millis(100));
434 let value = serde_json::json!({
435 "subscription": 5,
436 "result": vec![i],
437 });
438 let _ = sink.notify(serde_json::from_value(value).unwrap());
439 }
440 finish.send(()).unwrap();
441 });
442 }),
443 ("unsubscribe_hello", move |id, _meta| {
444 called2.store(true, Ordering::SeqCst);
446 assert_eq!(id, SubscriptionId::Number(5));
447 future::ready(Ok(core::Value::Bool(true)))
448 }),
449 );
450
451 let (tx, rx) = std::sync::mpsc::channel();
453 let (client, rpc_client) = local::connect_with_pubsub::<TypedClient, _>(handler);
454 let received = Arc::new(std::sync::Mutex::new(vec![]));
455 let r2 = received.clone();
456 let fut = async move {
457 let mut stream =
458 client.subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32")?;
459 let result = stream.next().await;
460 r2.lock().unwrap().push(result.expect("Expected at least one item."));
461 tx.send(()).unwrap();
462 Ok(()) as RpcResult<_>
463 };
464
465 let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
466 pool.spawn_ok(rpc_client.map(|_| ()));
467 pool.spawn_ok(fut.map(|x| x.unwrap()));
468
469 rx.recv().unwrap();
470 assert!(
471 !received.lock().unwrap().is_empty(),
472 "Expected at least one received item."
473 );
474 finished.recv().unwrap();
477 assert_eq!(called.load(Ordering::SeqCst), true, "Unsubscribe not called.");
478 }
479}