jsonrpc_client_transports/transports/
ws.rs1use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::{RpcChannel, RpcError};
8use websocket::{ClientBuilder, OwnedMessage};
9
10pub fn try_connect<T>(url: &str) -> Result<impl Future<Output = Result<T, RpcError>>, RpcError>
16where
17 T: From<RpcChannel>,
18{
19 let client_builder = ClientBuilder::new(url).map_err(|e| RpcError::Other(Box::new(e)))?;
20 Ok(do_connect(client_builder))
21}
22
23pub fn connect<T>(url: &url::Url) -> impl Future<Output = Result<T, RpcError>>
27where
28 T: From<RpcChannel>,
29{
30 let client_builder = ClientBuilder::from_url(url);
31 do_connect(client_builder)
32}
33
34fn do_connect<T>(client_builder: ClientBuilder) -> impl Future<Output = Result<T, RpcError>>
35where
36 T: From<RpcChannel>,
37{
38 use futures::compat::{Future01CompatExt, Sink01CompatExt, Stream01CompatExt};
39 use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
40 use websocket::futures::Stream;
41
42 client_builder
43 .async_connect(None)
44 .compat()
45 .map_err(|error| RpcError::Other(Box::new(error)))
46 .map_ok(|(client, _)| {
47 let (sink, stream) = client.split();
48
49 let sink = sink.sink_compat().sink_map_err(|e| RpcError::Other(Box::new(e)));
50 let stream = stream.compat().map_err(|e| RpcError::Other(Box::new(e)));
51 let (sink, stream) = WebsocketClient::new(sink, stream).split();
52 let (sink, stream) = (
53 Box::pin(sink),
54 Box::pin(
55 stream
56 .take_while(|x| futures::future::ready(x.is_ok()))
57 .map(|x| x.expect("Stream is closed upon first error.")),
58 ),
59 );
60 let (rpc_client, sender) = super::duplex(sink, stream);
61 let rpc_client = rpc_client.map_err(|error| log::error!("{:?}", error));
62 tokio::spawn(rpc_client);
63
64 sender.into()
65 })
66}
67
68struct WebsocketClient<TSink, TStream> {
69 sink: TSink,
70 stream: TStream,
71 queue: VecDeque<OwnedMessage>,
72}
73
74impl<TSink, TStream, TError> WebsocketClient<TSink, TStream>
75where
76 TSink: futures::Sink<OwnedMessage, Error = TError> + Unpin,
77 TStream: futures::Stream<Item = Result<OwnedMessage, TError>> + Unpin,
78 TError: std::error::Error + Send + 'static,
79{
80 pub fn new(sink: TSink, stream: TStream) -> Self {
81 Self {
82 sink,
83 stream,
84 queue: VecDeque::new(),
85 }
86 }
87
88 fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), TSink::Error>> {
91 let this = Pin::into_inner(self);
92
93 match Pin::new(&mut this.sink).poll_ready(cx) {
94 Poll::Ready(value) => value?,
95 Poll::Pending => return Poll::Pending,
96 }
97
98 while let Some(item) = this.queue.pop_front() {
99 Pin::new(&mut this.sink).start_send(item)?;
100
101 if !this.queue.is_empty() {
102 match Pin::new(&mut this.sink).poll_ready(cx) {
103 Poll::Ready(value) => value?,
104 Poll::Pending => return Poll::Pending,
105 }
106 }
107 }
108
109 Poll::Ready(Ok(()))
110 }
111}
112
113impl<TSink, TStream> futures::Sink<String> for WebsocketClient<TSink, TStream>
118where
119 TSink: futures::Sink<OwnedMessage, Error = RpcError> + Unpin,
120 TStream: futures::Stream<Item = Result<OwnedMessage, RpcError>> + Unpin,
121{
122 type Error = RpcError;
123
124 fn start_send(mut self: Pin<&mut Self>, request: String) -> Result<(), Self::Error> {
125 let request = OwnedMessage::Text(request);
126
127 if self.queue.is_empty() {
128 let this = Pin::into_inner(self);
129 Pin::new(&mut this.sink).start_send(request)
130 } else {
131 self.queue.push_back(request);
132 Ok(())
133 }
134 }
135
136 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
137 let this = Pin::into_inner(self);
138
139 if this.queue.is_empty() {
140 return Pin::new(&mut this.sink).poll_ready(cx);
141 }
142
143 let _ = Pin::new(this).try_empty_buffer(cx)?;
144
145 Poll::Ready(Ok(()))
146 }
147
148 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
149 let this = Pin::into_inner(self);
150
151 match Pin::new(&mut *this).try_empty_buffer(cx) {
152 Poll::Ready(value) => value?,
153 Poll::Pending => return Poll::Pending,
154 }
155 debug_assert!(this.queue.is_empty());
156
157 Pin::new(&mut this.sink).poll_flush(cx)
158 }
159
160 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
161 let this = Pin::into_inner(self);
162
163 match Pin::new(&mut *this).try_empty_buffer(cx) {
164 Poll::Ready(value) => value?,
165 Poll::Pending => return Poll::Pending,
166 }
167 debug_assert!(this.queue.is_empty());
168
169 Pin::new(&mut this.sink).poll_close(cx)
170 }
171}
172
173impl<TSink, TStream> futures::Stream for WebsocketClient<TSink, TStream>
174where
175 TSink: futures::Sink<OwnedMessage, Error = RpcError> + Unpin,
176 TStream: futures::Stream<Item = Result<OwnedMessage, RpcError>> + Unpin,
177{
178 type Item = Result<String, RpcError>;
179
180 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181 let this = Pin::into_inner(self);
182 loop {
183 match Pin::new(&mut this.stream).poll_next(cx) {
184 Poll::Ready(Some(Ok(message))) => match message {
185 OwnedMessage::Text(data) => return Poll::Ready(Some(Ok(data))),
186 OwnedMessage::Binary(data) => log::info!("server sent binary data {:?}", data),
187 OwnedMessage::Ping(p) => this.queue.push_front(OwnedMessage::Pong(p)),
188 OwnedMessage::Pong(_) => {}
189 OwnedMessage::Close(c) => this.queue.push_front(OwnedMessage::Close(c)),
190 },
191 Poll::Ready(None) => {
192 return Poll::Ready(None);
194 }
195 Poll::Pending => return Poll::Pending,
196 Poll::Ready(Some(Err(error))) => return Poll::Ready(Some(Err(RpcError::Other(Box::new(error))))),
197 }
198 }
199 }
200}