use crate::{RpcChannel, RpcError};
use failure::format_err;
use futures::prelude::*;
use futures::sync::mpsc;
use jsonrpc_core::{MetaIoHandler, Metadata, Middleware};
use jsonrpc_pubsub::Session;
use std::ops::Deref;
use std::sync::Arc;
pub struct LocalRpc<THandler, TMetadata> {
handler: THandler,
meta: TMetadata,
sender: mpsc::UnboundedSender<String>,
receiver: mpsc::UnboundedReceiver<String>,
}
impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata,
TMiddleware: Middleware<TMetadata>,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
{
pub fn new(handler: THandler) -> Self
where
TMetadata: Default,
{
Self::with_metadata(handler, Default::default())
}
pub fn with_metadata(handler: THandler, meta: TMetadata) -> Self {
let (sender, receiver) = mpsc::unbounded();
Self {
handler,
meta,
sender,
receiver,
}
}
}
impl<TMetadata, THandler, TMiddleware> Stream for LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata,
TMiddleware: Middleware<TMetadata>,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
{
type Item = String;
type Error = RpcError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
self.receiver
.poll()
.map_err(|()| RpcError::Other(format_err!("Sender dropped.")))
}
}
impl<TMetadata, THandler, TMiddleware> Sink for LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata,
TMiddleware: Middleware<TMetadata>,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
{
type SinkItem = String;
type SinkError = RpcError;
fn start_send(&mut self, request: Self::SinkItem) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
match self.handler.handle_request_sync(&request, self.meta.clone()) {
Some(response) => self
.sender
.unbounded_send(response)
.map_err(|_| RpcError::Other(format_err!("Receiver dropped.")))?,
None => {}
};
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
Ok(Async::Ready(()))
}
}
pub fn connect_with_metadata_and_middleware<TClient, THandler, TMetadata, TMiddleware>(
handler: THandler,
meta: TMetadata,
) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
TMiddleware: Middleware<TMetadata>,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
TMetadata: Metadata,
{
let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
let (rpc_client, sender) = crate::transports::duplex(sink, stream);
let client = TClient::from(sender);
(client, rpc_client)
}
pub fn connect_with_metadata<TClient, THandler, TMetadata>(
handler: THandler,
meta: TMetadata,
) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
TMetadata: Metadata,
{
connect_with_metadata_and_middleware(handler, meta)
}
pub fn connect_with_middleware<TClient, THandler, TMetadata, TMiddleware>(
handler: THandler,
) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
TMiddleware: Middleware<TMetadata>,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
TMetadata: Metadata + Default,
{
connect_with_metadata_and_middleware(handler, Default::default())
}
pub fn connect<TClient, THandler, TMetadata>(handler: THandler) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
TMetadata: Metadata + Default,
{
connect_with_middleware(handler)
}
pub type LocalMeta = Arc<Session>;
pub fn connect_with_pubsub_and_middleware<TClient, THandler, TMiddleware>(
handler: THandler,
) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
TMiddleware: Middleware<LocalMeta>,
THandler: Deref<Target = MetaIoHandler<LocalMeta, TMiddleware>>,
{
let (tx, rx) = mpsc::channel(0);
let meta = Arc::new(Session::new(tx));
let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
let stream = stream.select(rx.map_err(|_| RpcError::Other(format_err!("Pubsub channel returned an error"))));
let (rpc_client, sender) = crate::transports::duplex(sink, stream);
let client = TClient::from(sender);
(client, rpc_client)
}
pub fn connect_with_pubsub<TClient, THandler>(handler: THandler) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<LocalMeta>>,
{
connect_with_pubsub_and_middleware(handler)
}