jsonrpc_client_transports/transports/
duplex.rs

1//! Duplex transport
2
3use futures::channel::{mpsc, oneshot};
4use futures::{
5	task::{Context, Poll},
6	Future, Sink, Stream, StreamExt,
7};
8use jsonrpc_core::Id;
9use jsonrpc_pubsub::SubscriptionId;
10use log::debug;
11use serde_json::Value;
12use std::collections::HashMap;
13use std::collections::VecDeque;
14use std::pin::Pin;
15
16use super::RequestBuilder;
17use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
18
19struct Subscription {
20	/// Subscription id received when subscribing.
21	id: Option<SubscriptionId>,
22	/// A method name used for notification.
23	notification: String,
24	/// Rpc method to unsubscribe.
25	unsubscribe: String,
26	/// Where to send messages to.
27	channel: mpsc::UnboundedSender<RpcResult<Value>>,
28}
29
30impl Subscription {
31	fn new(channel: mpsc::UnboundedSender<RpcResult<Value>>, notification: String, unsubscribe: String) -> Self {
32		Subscription {
33			id: None,
34			notification,
35			unsubscribe,
36			channel,
37		}
38	}
39}
40
41enum PendingRequest {
42	Call(oneshot::Sender<RpcResult<Value>>),
43	Subscription(Subscription),
44}
45
46/// The Duplex handles sending and receiving asynchronous
47/// messages through an underlying transport.
48pub struct Duplex<TSink, TStream> {
49	request_builder: RequestBuilder,
50	/// Channel from the client.
51	channel: Option<mpsc::UnboundedReceiver<RpcMessage>>,
52	/// Requests that haven't received a response yet.
53	pending_requests: HashMap<Id, PendingRequest>,
54	/// A map from the subscription name to the subscription.
55	subscriptions: HashMap<(SubscriptionId, String), Subscription>,
56	/// Incoming messages from the underlying transport.
57	stream: Pin<Box<TStream>>,
58	/// Unprocessed incoming messages.
59	incoming: VecDeque<(Id, RpcResult<Value>, Option<String>, Option<SubscriptionId>)>,
60	/// Unprocessed outgoing messages.
61	outgoing: VecDeque<String>,
62	/// Outgoing messages from the underlying transport.
63	sink: Pin<Box<TSink>>,
64}
65
66impl<TSink, TStream> Duplex<TSink, TStream> {
67	/// Creates a new `Duplex`.
68	fn new(sink: Pin<Box<TSink>>, stream: Pin<Box<TStream>>, channel: mpsc::UnboundedReceiver<RpcMessage>) -> Self {
69		log::debug!("open");
70		Duplex {
71			request_builder: RequestBuilder::new(),
72			channel: Some(channel),
73			pending_requests: Default::default(),
74			subscriptions: Default::default(),
75			stream,
76			incoming: Default::default(),
77			outgoing: Default::default(),
78			sink,
79		}
80	}
81}
82
83/// Creates a new `Duplex`, along with a channel to communicate
84pub fn duplex<TSink, TStream>(sink: Pin<Box<TSink>>, stream: Pin<Box<TStream>>) -> (Duplex<TSink, TStream>, RpcChannel)
85where
86	TSink: Sink<String>,
87	TStream: Stream<Item = String>,
88{
89	let (sender, receiver) = mpsc::unbounded();
90	let client = Duplex::new(sink, stream, receiver);
91	(client, sender.into())
92}
93
94impl<TSink, TStream> Future for Duplex<TSink, TStream>
95where
96	TSink: Sink<String>,
97	TStream: Stream<Item = String>,
98{
99	type Output = RpcResult<()>;
100
101	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
102		// Handle requests from the client.
103		log::debug!("handle requests from client");
104		loop {
105			// Check that the client channel is open
106			let channel = match self.channel.as_mut() {
107				Some(channel) => channel,
108				None => break,
109			};
110			let msg = match channel.poll_next_unpin(cx) {
111				Poll::Ready(Some(msg)) => msg,
112				Poll::Ready(None) => {
113					// When the channel is dropped we still need to finish
114					// outstanding requests.
115					self.channel.take();
116					break;
117				}
118				Poll::Pending => break,
119			};
120			let request_str = match msg {
121				RpcMessage::Call(msg) => {
122					let (id, request_str) = self.request_builder.call_request(&msg);
123
124					if self
125						.pending_requests
126						.insert(id.clone(), PendingRequest::Call(msg.sender))
127						.is_some()
128					{
129						log::error!("reuse of request id {:?}", id);
130					}
131					request_str
132				}
133				RpcMessage::Subscribe(msg) => {
134					let crate::Subscription {
135						subscribe,
136						subscribe_params,
137						notification,
138						unsubscribe,
139					} = msg.subscription;
140					let (id, request_str) = self.request_builder.subscribe_request(subscribe, subscribe_params);
141					log::debug!("subscribing to {}", notification);
142					let subscription = Subscription::new(msg.sender, notification, unsubscribe);
143					if self
144						.pending_requests
145						.insert(id.clone(), PendingRequest::Subscription(subscription))
146						.is_some()
147					{
148						log::error!("reuse of request id {:?}", id);
149					}
150					request_str
151				}
152				RpcMessage::Notify(msg) => self.request_builder.notification(&msg),
153			};
154			log::debug!("outgoing: {}", request_str);
155			self.outgoing.push_back(request_str);
156		}
157
158		// Handle stream.
159		// Reads from stream and queues to incoming queue.
160		log::debug!("handle stream");
161		loop {
162			let response_str = match self.stream.as_mut().poll_next(cx) {
163				Poll::Ready(Some(response_str)) => response_str,
164				Poll::Ready(None) => {
165					// The websocket connection was closed so the client
166					// can be shutdown. Reopening closed connections must
167					// be handled by the transport.
168					debug!("connection closed");
169					return Poll::Ready(Ok(()));
170				}
171				Poll::Pending => break,
172			};
173			log::debug!("incoming: {}", response_str);
174			// we only send one request at the time, so there can only be one response.
175			let (id, result, method, sid) = super::parse_response(&response_str)?;
176			log::debug!(
177				"id: {:?} (sid: {:?}) result: {:?} method: {:?}",
178				id,
179				sid,
180				result,
181				method
182			);
183			self.incoming.push_back((id, result, method, sid));
184		}
185
186		// Handle incoming queue.
187		log::debug!("handle incoming");
188		loop {
189			match self.incoming.pop_front() {
190				Some((id, result, method, sid)) => {
191					let sid_and_method = sid.and_then(|sid| method.map(|method| (sid, method)));
192					// Handle the response to a pending request.
193					match self.pending_requests.remove(&id) {
194						// It's a regular Req-Res call, so just answer.
195						Some(PendingRequest::Call(tx)) => {
196							tx.send(result)
197								.map_err(|_| RpcError::Client("oneshot channel closed".into()))?;
198							continue;
199						}
200						// It was a subscription request,
201						// turn it into a proper subscription.
202						Some(PendingRequest::Subscription(mut subscription)) => {
203							let sid = result.as_ref().ok().and_then(|res| SubscriptionId::parse_value(res));
204							let method = subscription.notification.clone();
205
206							if let Some(sid) = sid {
207								subscription.id = Some(sid.clone());
208								if self
209									.subscriptions
210									.insert((sid.clone(), method.clone()), subscription)
211									.is_some()
212								{
213									log::warn!(
214										"Overwriting existing subscription under {:?} ({:?}). \
215										 Seems that server returned the same subscription id.",
216										sid,
217										method,
218									);
219								}
220							} else {
221								let err = RpcError::Client(format!(
222									"Subscription {:?} ({:?}) rejected: {:?}",
223									id, method, result,
224								));
225
226								if subscription.channel.unbounded_send(result).is_err() {
227									log::warn!("{}, but the reply channel has closed.", err);
228								}
229							}
230							continue;
231						}
232						// It's not a pending request nor a notification
233						None if sid_and_method.is_none() => {
234							log::warn!("Got unexpected response with id {:?} ({:?})", id, sid_and_method);
235							continue;
236						}
237						// just fall-through in case it's a notification
238						None => {}
239					};
240
241					let sid_and_method = if let Some(x) = sid_and_method {
242						x
243					} else {
244						continue;
245					};
246
247					if let Some(subscription) = self.subscriptions.get_mut(&sid_and_method) {
248						let res = subscription.channel.unbounded_send(result);
249						if res.is_err() {
250							let subscription = self
251								.subscriptions
252								.remove(&sid_and_method)
253								.expect("Subscription was just polled; qed");
254							let sid = subscription.id.expect(
255								"Every subscription that ends up in `self.subscriptions` has id already \
256								 assigned; assignment happens during response to subscribe request.",
257							);
258							let (_id, request_str) =
259								self.request_builder.unsubscribe_request(subscription.unsubscribe, sid);
260							log::debug!("outgoing: {}", request_str);
261							self.outgoing.push_back(request_str);
262							log::debug!("unsubscribed from {:?}", sid_and_method);
263						}
264					} else {
265						log::warn!("Received unexpected subscription notification: {:?}", sid_and_method);
266					}
267				}
268				None => break,
269			}
270		}
271
272		// Handle outgoing queue.
273		// Writes queued messages to sink.
274		log::debug!("handle outgoing");
275		loop {
276			let err = || Err(RpcError::Client("closing".into()));
277			match self.sink.as_mut().poll_ready(cx) {
278				Poll::Ready(Ok(())) => {}
279				Poll::Ready(Err(_)) => return err().into(),
280				_ => break,
281			}
282			match self.outgoing.pop_front() {
283				Some(request) => {
284					if self.sink.as_mut().start_send(request).is_err() {
285						// the channel is disconnected.
286						return err().into();
287					}
288				}
289				None => break,
290			}
291		}
292		log::debug!("handle sink");
293		let sink_empty = match self.sink.as_mut().poll_flush(cx) {
294			Poll::Ready(Ok(())) => true,
295			Poll::Ready(Err(_)) => true,
296			Poll::Pending => false,
297		};
298
299		log::debug!("{:?}", self);
300		// Return ready when the future is complete
301		if self.channel.is_none()
302			&& self.outgoing.is_empty()
303			&& self.incoming.is_empty()
304			&& self.pending_requests.is_empty()
305			&& self.subscriptions.is_empty()
306			&& sink_empty
307		{
308			log::debug!("close");
309			Poll::Ready(Ok(()))
310		} else {
311			Poll::Pending
312		}
313	}
314}
315
316impl<TSink, TStream> std::fmt::Debug for Duplex<TSink, TStream> {
317	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
318		writeln!(f, "channel is none: {}", self.channel.is_none())?;
319		writeln!(f, "outgoing: {}", self.outgoing.len())?;
320		writeln!(f, "incoming: {}", self.incoming.len())?;
321		writeln!(f, "pending_requests: {}", self.pending_requests.len())?;
322		writeln!(f, "subscriptions: {}", self.subscriptions.len())?;
323		Ok(())
324	}
325}