jsonrpc_client_transports/transports/
duplex.rs1use futures::channel::{mpsc, oneshot};
4use futures::{
5 task::{Context, Poll},
6 Future, Sink, Stream, StreamExt,
7};
8use jsonrpc_core::Id;
9use jsonrpc_pubsub::SubscriptionId;
10use log::debug;
11use serde_json::Value;
12use std::collections::HashMap;
13use std::collections::VecDeque;
14use std::pin::Pin;
15
16use super::RequestBuilder;
17use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
18
19struct Subscription {
20 id: Option<SubscriptionId>,
22 notification: String,
24 unsubscribe: String,
26 channel: mpsc::UnboundedSender<RpcResult<Value>>,
28}
29
30impl Subscription {
31 fn new(channel: mpsc::UnboundedSender<RpcResult<Value>>, notification: String, unsubscribe: String) -> Self {
32 Subscription {
33 id: None,
34 notification,
35 unsubscribe,
36 channel,
37 }
38 }
39}
40
41enum PendingRequest {
42 Call(oneshot::Sender<RpcResult<Value>>),
43 Subscription(Subscription),
44}
45
46pub struct Duplex<TSink, TStream> {
49 request_builder: RequestBuilder,
50 channel: Option<mpsc::UnboundedReceiver<RpcMessage>>,
52 pending_requests: HashMap<Id, PendingRequest>,
54 subscriptions: HashMap<(SubscriptionId, String), Subscription>,
56 stream: Pin<Box<TStream>>,
58 incoming: VecDeque<(Id, RpcResult<Value>, Option<String>, Option<SubscriptionId>)>,
60 outgoing: VecDeque<String>,
62 sink: Pin<Box<TSink>>,
64}
65
66impl<TSink, TStream> Duplex<TSink, TStream> {
67 fn new(sink: Pin<Box<TSink>>, stream: Pin<Box<TStream>>, channel: mpsc::UnboundedReceiver<RpcMessage>) -> Self {
69 log::debug!("open");
70 Duplex {
71 request_builder: RequestBuilder::new(),
72 channel: Some(channel),
73 pending_requests: Default::default(),
74 subscriptions: Default::default(),
75 stream,
76 incoming: Default::default(),
77 outgoing: Default::default(),
78 sink,
79 }
80 }
81}
82
83pub fn duplex<TSink, TStream>(sink: Pin<Box<TSink>>, stream: Pin<Box<TStream>>) -> (Duplex<TSink, TStream>, RpcChannel)
85where
86 TSink: Sink<String>,
87 TStream: Stream<Item = String>,
88{
89 let (sender, receiver) = mpsc::unbounded();
90 let client = Duplex::new(sink, stream, receiver);
91 (client, sender.into())
92}
93
94impl<TSink, TStream> Future for Duplex<TSink, TStream>
95where
96 TSink: Sink<String>,
97 TStream: Stream<Item = String>,
98{
99 type Output = RpcResult<()>;
100
101 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
102 log::debug!("handle requests from client");
104 loop {
105 let channel = match self.channel.as_mut() {
107 Some(channel) => channel,
108 None => break,
109 };
110 let msg = match channel.poll_next_unpin(cx) {
111 Poll::Ready(Some(msg)) => msg,
112 Poll::Ready(None) => {
113 self.channel.take();
116 break;
117 }
118 Poll::Pending => break,
119 };
120 let request_str = match msg {
121 RpcMessage::Call(msg) => {
122 let (id, request_str) = self.request_builder.call_request(&msg);
123
124 if self
125 .pending_requests
126 .insert(id.clone(), PendingRequest::Call(msg.sender))
127 .is_some()
128 {
129 log::error!("reuse of request id {:?}", id);
130 }
131 request_str
132 }
133 RpcMessage::Subscribe(msg) => {
134 let crate::Subscription {
135 subscribe,
136 subscribe_params,
137 notification,
138 unsubscribe,
139 } = msg.subscription;
140 let (id, request_str) = self.request_builder.subscribe_request(subscribe, subscribe_params);
141 log::debug!("subscribing to {}", notification);
142 let subscription = Subscription::new(msg.sender, notification, unsubscribe);
143 if self
144 .pending_requests
145 .insert(id.clone(), PendingRequest::Subscription(subscription))
146 .is_some()
147 {
148 log::error!("reuse of request id {:?}", id);
149 }
150 request_str
151 }
152 RpcMessage::Notify(msg) => self.request_builder.notification(&msg),
153 };
154 log::debug!("outgoing: {}", request_str);
155 self.outgoing.push_back(request_str);
156 }
157
158 log::debug!("handle stream");
161 loop {
162 let response_str = match self.stream.as_mut().poll_next(cx) {
163 Poll::Ready(Some(response_str)) => response_str,
164 Poll::Ready(None) => {
165 debug!("connection closed");
169 return Poll::Ready(Ok(()));
170 }
171 Poll::Pending => break,
172 };
173 log::debug!("incoming: {}", response_str);
174 let (id, result, method, sid) = super::parse_response(&response_str)?;
176 log::debug!(
177 "id: {:?} (sid: {:?}) result: {:?} method: {:?}",
178 id,
179 sid,
180 result,
181 method
182 );
183 self.incoming.push_back((id, result, method, sid));
184 }
185
186 log::debug!("handle incoming");
188 loop {
189 match self.incoming.pop_front() {
190 Some((id, result, method, sid)) => {
191 let sid_and_method = sid.and_then(|sid| method.map(|method| (sid, method)));
192 match self.pending_requests.remove(&id) {
194 Some(PendingRequest::Call(tx)) => {
196 tx.send(result)
197 .map_err(|_| RpcError::Client("oneshot channel closed".into()))?;
198 continue;
199 }
200 Some(PendingRequest::Subscription(mut subscription)) => {
203 let sid = result.as_ref().ok().and_then(|res| SubscriptionId::parse_value(res));
204 let method = subscription.notification.clone();
205
206 if let Some(sid) = sid {
207 subscription.id = Some(sid.clone());
208 if self
209 .subscriptions
210 .insert((sid.clone(), method.clone()), subscription)
211 .is_some()
212 {
213 log::warn!(
214 "Overwriting existing subscription under {:?} ({:?}). \
215 Seems that server returned the same subscription id.",
216 sid,
217 method,
218 );
219 }
220 } else {
221 let err = RpcError::Client(format!(
222 "Subscription {:?} ({:?}) rejected: {:?}",
223 id, method, result,
224 ));
225
226 if subscription.channel.unbounded_send(result).is_err() {
227 log::warn!("{}, but the reply channel has closed.", err);
228 }
229 }
230 continue;
231 }
232 None if sid_and_method.is_none() => {
234 log::warn!("Got unexpected response with id {:?} ({:?})", id, sid_and_method);
235 continue;
236 }
237 None => {}
239 };
240
241 let sid_and_method = if let Some(x) = sid_and_method {
242 x
243 } else {
244 continue;
245 };
246
247 if let Some(subscription) = self.subscriptions.get_mut(&sid_and_method) {
248 let res = subscription.channel.unbounded_send(result);
249 if res.is_err() {
250 let subscription = self
251 .subscriptions
252 .remove(&sid_and_method)
253 .expect("Subscription was just polled; qed");
254 let sid = subscription.id.expect(
255 "Every subscription that ends up in `self.subscriptions` has id already \
256 assigned; assignment happens during response to subscribe request.",
257 );
258 let (_id, request_str) =
259 self.request_builder.unsubscribe_request(subscription.unsubscribe, sid);
260 log::debug!("outgoing: {}", request_str);
261 self.outgoing.push_back(request_str);
262 log::debug!("unsubscribed from {:?}", sid_and_method);
263 }
264 } else {
265 log::warn!("Received unexpected subscription notification: {:?}", sid_and_method);
266 }
267 }
268 None => break,
269 }
270 }
271
272 log::debug!("handle outgoing");
275 loop {
276 let err = || Err(RpcError::Client("closing".into()));
277 match self.sink.as_mut().poll_ready(cx) {
278 Poll::Ready(Ok(())) => {}
279 Poll::Ready(Err(_)) => return err().into(),
280 _ => break,
281 }
282 match self.outgoing.pop_front() {
283 Some(request) => {
284 if self.sink.as_mut().start_send(request).is_err() {
285 return err().into();
287 }
288 }
289 None => break,
290 }
291 }
292 log::debug!("handle sink");
293 let sink_empty = match self.sink.as_mut().poll_flush(cx) {
294 Poll::Ready(Ok(())) => true,
295 Poll::Ready(Err(_)) => true,
296 Poll::Pending => false,
297 };
298
299 log::debug!("{:?}", self);
300 if self.channel.is_none()
302 && self.outgoing.is_empty()
303 && self.incoming.is_empty()
304 && self.pending_requests.is_empty()
305 && self.subscriptions.is_empty()
306 && sink_empty
307 {
308 log::debug!("close");
309 Poll::Ready(Ok(()))
310 } else {
311 Poll::Pending
312 }
313 }
314}
315
316impl<TSink, TStream> std::fmt::Debug for Duplex<TSink, TStream> {
317 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
318 writeln!(f, "channel is none: {}", self.channel.is_none())?;
319 writeln!(f, "outgoing: {}", self.outgoing.len())?;
320 writeln!(f, "incoming: {}", self.incoming.len())?;
321 writeln!(f, "pending_requests: {}", self.pending_requests.len())?;
322 writeln!(f, "subscriptions: {}", self.subscriptions.len())?;
323 Ok(())
324 }
325}