jsonrpc_client_transports/transports/
duplex.rsuse futures::channel::{mpsc, oneshot};
use futures::{
task::{Context, Poll},
Future, Sink, Stream, StreamExt,
};
use jsonrpc_core::Id;
use jsonrpc_pubsub::SubscriptionId;
use log::debug;
use serde_json::Value;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::pin::Pin;
use super::RequestBuilder;
use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
struct Subscription {
id: Option<SubscriptionId>,
notification: String,
unsubscribe: String,
channel: mpsc::UnboundedSender<RpcResult<Value>>,
}
impl Subscription {
fn new(channel: mpsc::UnboundedSender<RpcResult<Value>>, notification: String, unsubscribe: String) -> Self {
Subscription {
id: None,
notification,
unsubscribe,
channel,
}
}
}
enum PendingRequest {
Call(oneshot::Sender<RpcResult<Value>>),
Subscription(Subscription),
}
pub struct Duplex<TSink, TStream> {
request_builder: RequestBuilder,
channel: Option<mpsc::UnboundedReceiver<RpcMessage>>,
pending_requests: HashMap<Id, PendingRequest>,
subscriptions: HashMap<(SubscriptionId, String), Subscription>,
stream: Pin<Box<TStream>>,
incoming: VecDeque<(Id, RpcResult<Value>, Option<String>, Option<SubscriptionId>)>,
outgoing: VecDeque<String>,
sink: Pin<Box<TSink>>,
}
impl<TSink, TStream> Duplex<TSink, TStream> {
fn new(sink: Pin<Box<TSink>>, stream: Pin<Box<TStream>>, channel: mpsc::UnboundedReceiver<RpcMessage>) -> Self {
log::debug!("open");
Duplex {
request_builder: RequestBuilder::new(),
channel: Some(channel),
pending_requests: Default::default(),
subscriptions: Default::default(),
stream,
incoming: Default::default(),
outgoing: Default::default(),
sink,
}
}
}
pub fn duplex<TSink, TStream>(sink: Pin<Box<TSink>>, stream: Pin<Box<TStream>>) -> (Duplex<TSink, TStream>, RpcChannel)
where
TSink: Sink<String>,
TStream: Stream<Item = String>,
{
let (sender, receiver) = mpsc::unbounded();
let client = Duplex::new(sink, stream, receiver);
(client, sender.into())
}
impl<TSink, TStream> Future for Duplex<TSink, TStream>
where
TSink: Sink<String>,
TStream: Stream<Item = String>,
{
type Output = RpcResult<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
log::debug!("handle requests from client");
loop {
let channel = match self.channel.as_mut() {
Some(channel) => channel,
None => break,
};
let msg = match channel.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => {
self.channel.take();
break;
}
Poll::Pending => break,
};
let request_str = match msg {
RpcMessage::Call(msg) => {
let (id, request_str) = self.request_builder.call_request(&msg);
if self
.pending_requests
.insert(id.clone(), PendingRequest::Call(msg.sender))
.is_some()
{
log::error!("reuse of request id {:?}", id);
}
request_str
}
RpcMessage::Subscribe(msg) => {
let crate::Subscription {
subscribe,
subscribe_params,
notification,
unsubscribe,
} = msg.subscription;
let (id, request_str) = self.request_builder.subscribe_request(subscribe, subscribe_params);
log::debug!("subscribing to {}", notification);
let subscription = Subscription::new(msg.sender, notification, unsubscribe);
if self
.pending_requests
.insert(id.clone(), PendingRequest::Subscription(subscription))
.is_some()
{
log::error!("reuse of request id {:?}", id);
}
request_str
}
RpcMessage::Notify(msg) => self.request_builder.notification(&msg),
};
log::debug!("outgoing: {}", request_str);
self.outgoing.push_back(request_str);
}
log::debug!("handle stream");
loop {
let response_str = match self.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(response_str)) => response_str,
Poll::Ready(None) => {
debug!("connection closed");
return Poll::Ready(Ok(()));
}
Poll::Pending => break,
};
log::debug!("incoming: {}", response_str);
let (id, result, method, sid) = super::parse_response(&response_str)?;
log::debug!(
"id: {:?} (sid: {:?}) result: {:?} method: {:?}",
id,
sid,
result,
method
);
self.incoming.push_back((id, result, method, sid));
}
log::debug!("handle incoming");
loop {
match self.incoming.pop_front() {
Some((id, result, method, sid)) => {
let sid_and_method = sid.and_then(|sid| method.map(|method| (sid, method)));
match self.pending_requests.remove(&id) {
Some(PendingRequest::Call(tx)) => {
tx.send(result)
.map_err(|_| RpcError::Client("oneshot channel closed".into()))?;
continue;
}
Some(PendingRequest::Subscription(mut subscription)) => {
let sid = result.as_ref().ok().and_then(|res| SubscriptionId::parse_value(res));
let method = subscription.notification.clone();
if let Some(sid) = sid {
subscription.id = Some(sid.clone());
if self
.subscriptions
.insert((sid.clone(), method.clone()), subscription)
.is_some()
{
log::warn!(
"Overwriting existing subscription under {:?} ({:?}). \
Seems that server returned the same subscription id.",
sid,
method,
);
}
} else {
let err = RpcError::Client(format!(
"Subscription {:?} ({:?}) rejected: {:?}",
id, method, result,
));
if subscription.channel.unbounded_send(result).is_err() {
log::warn!("{}, but the reply channel has closed.", err);
}
}
continue;
}
None if sid_and_method.is_none() => {
log::warn!("Got unexpected response with id {:?} ({:?})", id, sid_and_method);
continue;
}
None => {}
};
let sid_and_method = if let Some(x) = sid_and_method {
x
} else {
continue;
};
if let Some(subscription) = self.subscriptions.get_mut(&sid_and_method) {
let res = subscription.channel.unbounded_send(result);
if res.is_err() {
let subscription = self
.subscriptions
.remove(&sid_and_method)
.expect("Subscription was just polled; qed");
let sid = subscription.id.expect(
"Every subscription that ends up in `self.subscriptions` has id already \
assigned; assignment happens during response to subscribe request.",
);
let (_id, request_str) =
self.request_builder.unsubscribe_request(subscription.unsubscribe, sid);
log::debug!("outgoing: {}", request_str);
self.outgoing.push_back(request_str);
log::debug!("unsubscribed from {:?}", sid_and_method);
}
} else {
log::warn!("Received unexpected subscription notification: {:?}", sid_and_method);
}
}
None => break,
}
}
log::debug!("handle outgoing");
loop {
let err = || Err(RpcError::Client("closing".into()));
match self.sink.as_mut().poll_ready(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => return err().into(),
_ => break,
}
match self.outgoing.pop_front() {
Some(request) => {
if self.sink.as_mut().start_send(request).is_err() {
return err().into();
}
}
None => break,
}
}
log::debug!("handle sink");
let sink_empty = match self.sink.as_mut().poll_flush(cx) {
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(_)) => true,
Poll::Pending => false,
};
log::debug!("{:?}", self);
if self.channel.is_none()
&& self.outgoing.is_empty()
&& self.incoming.is_empty()
&& self.pending_requests.is_empty()
&& self.subscriptions.is_empty()
&& sink_empty
{
log::debug!("close");
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
impl<TSink, TStream> std::fmt::Debug for Duplex<TSink, TStream> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(f, "channel is none: {}", self.channel.is_none())?;
writeln!(f, "outgoing: {}", self.outgoing.len())?;
writeln!(f, "incoming: {}", self.incoming.len())?;
writeln!(f, "pending_requests: {}", self.pending_requests.len())?;
writeln!(f, "subscriptions: {}", self.subscriptions.len())?;
Ok(())
}
}