jsonrpc_client_transports/transports/
ws.rs

1//! JSON-RPC websocket client implementation.
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::{RpcChannel, RpcError};
8use websocket::{ClientBuilder, OwnedMessage};
9
10/// Connect to a JSON-RPC websocket server.
11///
12/// Uses an unbounded channel to queue outgoing rpc messages.
13///
14/// Returns `Err` if the `url` is invalid.
15pub fn try_connect<T>(url: &str) -> Result<impl Future<Output = Result<T, RpcError>>, RpcError>
16where
17	T: From<RpcChannel>,
18{
19	let client_builder = ClientBuilder::new(url).map_err(|e| RpcError::Other(Box::new(e)))?;
20	Ok(do_connect(client_builder))
21}
22
23/// Connect to a JSON-RPC websocket server.
24///
25/// Uses an unbounded channel to queue outgoing rpc messages.
26pub fn connect<T>(url: &url::Url) -> impl Future<Output = Result<T, RpcError>>
27where
28	T: From<RpcChannel>,
29{
30	let client_builder = ClientBuilder::from_url(url);
31	do_connect(client_builder)
32}
33
34fn do_connect<T>(client_builder: ClientBuilder) -> impl Future<Output = Result<T, RpcError>>
35where
36	T: From<RpcChannel>,
37{
38	use futures::compat::{Future01CompatExt, Sink01CompatExt, Stream01CompatExt};
39	use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
40	use websocket::futures::Stream;
41
42	client_builder
43		.async_connect(None)
44		.compat()
45		.map_err(|error| RpcError::Other(Box::new(error)))
46		.map_ok(|(client, _)| {
47			let (sink, stream) = client.split();
48
49			let sink = sink.sink_compat().sink_map_err(|e| RpcError::Other(Box::new(e)));
50			let stream = stream.compat().map_err(|e| RpcError::Other(Box::new(e)));
51			let (sink, stream) = WebsocketClient::new(sink, stream).split();
52			let (sink, stream) = (
53				Box::pin(sink),
54				Box::pin(
55					stream
56						.take_while(|x| futures::future::ready(x.is_ok()))
57						.map(|x| x.expect("Stream is closed upon first error.")),
58				),
59			);
60			let (rpc_client, sender) = super::duplex(sink, stream);
61			let rpc_client = rpc_client.map_err(|error| log::error!("{:?}", error));
62			tokio::spawn(rpc_client);
63
64			sender.into()
65		})
66}
67
68struct WebsocketClient<TSink, TStream> {
69	sink: TSink,
70	stream: TStream,
71	queue: VecDeque<OwnedMessage>,
72}
73
74impl<TSink, TStream, TError> WebsocketClient<TSink, TStream>
75where
76	TSink: futures::Sink<OwnedMessage, Error = TError> + Unpin,
77	TStream: futures::Stream<Item = Result<OwnedMessage, TError>> + Unpin,
78	TError: std::error::Error + Send + 'static,
79{
80	pub fn new(sink: TSink, stream: TStream) -> Self {
81		Self {
82			sink,
83			stream,
84			queue: VecDeque::new(),
85		}
86	}
87
88	// Drains the internal buffer and attempts to forward as much of the items
89	// as possible to the underlying sink
90	fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), TSink::Error>> {
91		let this = Pin::into_inner(self);
92
93		match Pin::new(&mut this.sink).poll_ready(cx) {
94			Poll::Ready(value) => value?,
95			Poll::Pending => return Poll::Pending,
96		}
97
98		while let Some(item) = this.queue.pop_front() {
99			Pin::new(&mut this.sink).start_send(item)?;
100
101			if !this.queue.is_empty() {
102				match Pin::new(&mut this.sink).poll_ready(cx) {
103					Poll::Ready(value) => value?,
104					Poll::Pending => return Poll::Pending,
105				}
106			}
107		}
108
109		Poll::Ready(Ok(()))
110	}
111}
112
113// This mostly forwards to the underlying sink but also adds an unbounded queue
114// for when the underlying sink is incapable of receiving more items.
115// See https://docs.rs/futures-util/0.3.8/futures_util/sink/struct.Buffer.html
116// for the variant with a fixed-size buffer.
117impl<TSink, TStream> futures::Sink<String> for WebsocketClient<TSink, TStream>
118where
119	TSink: futures::Sink<OwnedMessage, Error = RpcError> + Unpin,
120	TStream: futures::Stream<Item = Result<OwnedMessage, RpcError>> + Unpin,
121{
122	type Error = RpcError;
123
124	fn start_send(mut self: Pin<&mut Self>, request: String) -> Result<(), Self::Error> {
125		let request = OwnedMessage::Text(request);
126
127		if self.queue.is_empty() {
128			let this = Pin::into_inner(self);
129			Pin::new(&mut this.sink).start_send(request)
130		} else {
131			self.queue.push_back(request);
132			Ok(())
133		}
134	}
135
136	fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
137		let this = Pin::into_inner(self);
138
139		if this.queue.is_empty() {
140			return Pin::new(&mut this.sink).poll_ready(cx);
141		}
142
143		let _ = Pin::new(this).try_empty_buffer(cx)?;
144
145		Poll::Ready(Ok(()))
146	}
147
148	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
149		let this = Pin::into_inner(self);
150
151		match Pin::new(&mut *this).try_empty_buffer(cx) {
152			Poll::Ready(value) => value?,
153			Poll::Pending => return Poll::Pending,
154		}
155		debug_assert!(this.queue.is_empty());
156
157		Pin::new(&mut this.sink).poll_flush(cx)
158	}
159
160	fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
161		let this = Pin::into_inner(self);
162
163		match Pin::new(&mut *this).try_empty_buffer(cx) {
164			Poll::Ready(value) => value?,
165			Poll::Pending => return Poll::Pending,
166		}
167		debug_assert!(this.queue.is_empty());
168
169		Pin::new(&mut this.sink).poll_close(cx)
170	}
171}
172
173impl<TSink, TStream> futures::Stream for WebsocketClient<TSink, TStream>
174where
175	TSink: futures::Sink<OwnedMessage, Error = RpcError> + Unpin,
176	TStream: futures::Stream<Item = Result<OwnedMessage, RpcError>> + Unpin,
177{
178	type Item = Result<String, RpcError>;
179
180	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181		let this = Pin::into_inner(self);
182		loop {
183			match Pin::new(&mut this.stream).poll_next(cx) {
184				Poll::Ready(Some(Ok(message))) => match message {
185					OwnedMessage::Text(data) => return Poll::Ready(Some(Ok(data))),
186					OwnedMessage::Binary(data) => log::info!("server sent binary data {:?}", data),
187					OwnedMessage::Ping(p) => this.queue.push_front(OwnedMessage::Pong(p)),
188					OwnedMessage::Pong(_) => {}
189					OwnedMessage::Close(c) => this.queue.push_front(OwnedMessage::Close(c)),
190				},
191				Poll::Ready(None) => {
192					// TODO try to reconnect (#411).
193					return Poll::Ready(None);
194				}
195				Poll::Pending => return Poll::Pending,
196				Poll::Ready(Some(Err(error))) => return Poll::Ready(Some(Err(RpcError::Other(Box::new(error))))),
197			}
198		}
199	}
200}