jsonrpc_client_transports/transports/
http.rs

1//! HTTP client
2//!
3//! HTTPS support is enabled with the `tls` feature.
4
5use super::RequestBuilder;
6use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
7use futures::{future, Future, FutureExt, StreamExt, TryFutureExt};
8use hyper::{http, Client, Request, Uri};
9
10/// Create a HTTP Client
11pub async fn connect<TClient>(url: &str) -> RpcResult<TClient>
12where
13	TClient: From<RpcChannel>,
14{
15	let url: Uri = url.parse().map_err(|e| RpcError::Other(Box::new(e)))?;
16
17	let (client_api, client_worker) = do_connect(url).await;
18	tokio::spawn(client_worker);
19
20	Ok(TClient::from(client_api))
21}
22
23async fn do_connect(url: Uri) -> (RpcChannel, impl Future<Output = ()>) {
24	let max_parallel = 8;
25
26	#[cfg(feature = "tls")]
27	let connector = hyper_tls::HttpsConnector::new();
28	#[cfg(feature = "tls")]
29	let client = Client::builder().build::<_, hyper::Body>(connector);
30
31	#[cfg(not(feature = "tls"))]
32	let client = Client::new();
33	// Keep track of internal request IDs when building subsequent requests
34	let mut request_builder = RequestBuilder::new();
35
36	let (sender, receiver) = futures::channel::mpsc::unbounded();
37
38	let fut = receiver
39		.filter_map(move |msg: RpcMessage| {
40			future::ready(match msg {
41				RpcMessage::Call(call) => {
42					let (_, request) = request_builder.call_request(&call);
43					Some((request, Some(call.sender)))
44				}
45				RpcMessage::Notify(notify) => Some((request_builder.notification(&notify), None)),
46				RpcMessage::Subscribe(_) => {
47					log::warn!("Unsupported `RpcMessage` type `Subscribe`.");
48					None
49				}
50			})
51		})
52		.map(move |(request, sender)| {
53			let request = Request::post(&url)
54				.header(
55					http::header::CONTENT_TYPE,
56					http::header::HeaderValue::from_static("application/json"),
57				)
58				.header(
59					http::header::ACCEPT,
60					http::header::HeaderValue::from_static("application/json"),
61				)
62				.body(request.into())
63				.expect("Uri and request headers are valid; qed");
64
65			client
66				.request(request)
67				.then(|response| async move { (response, sender) })
68		})
69		.buffer_unordered(max_parallel)
70		.for_each(|(response, sender)| async {
71			let result = match response {
72				Ok(ref res) if !res.status().is_success() => {
73					log::trace!("http result status {}", res.status());
74					Err(RpcError::Client(format!(
75						"Unexpected response status code: {}",
76						res.status()
77					)))
78				}
79				Err(err) => Err(RpcError::Other(Box::new(err))),
80				Ok(res) => {
81					hyper::body::to_bytes(res.into_body())
82						.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
83						.await
84				}
85			};
86
87			if let Some(sender) = sender {
88				let response = result
89					.and_then(|response| {
90						let response_str = String::from_utf8_lossy(response.as_ref()).into_owned();
91						super::parse_response(&response_str)
92					})
93					.and_then(|r| r.1);
94				if let Err(err) = sender.send(response) {
95					log::warn!("Error resuming asynchronous request: {:?}", err);
96				}
97			}
98		});
99
100	(sender.into(), fut)
101}
102
103#[cfg(test)]
104mod tests {
105	use super::*;
106	use crate::*;
107	use assert_matches::assert_matches;
108	use jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value};
109	use jsonrpc_http_server::*;
110
111	fn id<T>(t: T) -> T {
112		t
113	}
114
115	struct TestServer {
116		uri: String,
117		server: Option<Server>,
118	}
119
120	impl TestServer {
121		fn serve<F: FnOnce(ServerBuilder) -> ServerBuilder>(alter: F) -> Self {
122			let builder = ServerBuilder::new(io()).rest_api(RestApi::Unsecure);
123
124			let server = alter(builder).start_http(&"127.0.0.1:0".parse().unwrap()).unwrap();
125			let uri = format!("http://{}", server.address());
126
127			TestServer {
128				uri,
129				server: Some(server),
130			}
131		}
132
133		fn stop(&mut self) {
134			let server = self.server.take();
135			if let Some(server) = server {
136				server.close();
137			}
138		}
139	}
140
141	fn io() -> IoHandler {
142		let mut io = IoHandler::default();
143		io.add_sync_method("hello", |params: Params| match params.parse::<(String,)>() {
144			Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))),
145			_ => Ok(Value::String("world".into())),
146		});
147		io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34))));
148		io.add_notification("notify", |params: Params| {
149			let (value,) = params.parse::<(u64,)>().expect("expected one u64 as param");
150			assert_eq!(value, 12);
151		});
152
153		io
154	}
155
156	#[derive(Clone)]
157	struct TestClient(TypedClient);
158
159	impl From<RpcChannel> for TestClient {
160		fn from(channel: RpcChannel) -> Self {
161			TestClient(channel.into())
162		}
163	}
164
165	impl TestClient {
166		fn hello(&self, msg: &'static str) -> impl Future<Output = RpcResult<String>> {
167			self.0.call_method("hello", "String", (msg,))
168		}
169		fn fail(&self) -> impl Future<Output = RpcResult<()>> {
170			self.0.call_method("fail", "()", ())
171		}
172		fn notify(&self, value: u64) -> RpcResult<()> {
173			self.0.notify("notify", (value,))
174		}
175	}
176
177	#[test]
178	fn should_work() {
179		crate::logger::init_log();
180
181		// given
182		let server = TestServer::serve(id);
183
184		// when
185		let run = async {
186			let client: TestClient = connect(&server.uri).await?;
187			let result = client.hello("http").await?;
188
189			// then
190			assert_eq!("hello http", result);
191			Ok(()) as RpcResult<_>
192		};
193
194		tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap();
195	}
196
197	#[test]
198	fn should_send_notification() {
199		crate::logger::init_log();
200
201		// given
202		let server = TestServer::serve(id);
203
204		// when
205		let run = async {
206			let client: TestClient = connect(&server.uri).await.unwrap();
207			client.notify(12).unwrap();
208		};
209
210		tokio::runtime::Runtime::new().unwrap().block_on(run);
211		// Ensure that server has not been moved into runtime
212		drop(server);
213	}
214
215	#[test]
216	fn handles_invalid_uri() {
217		crate::logger::init_log();
218
219		// given
220		let invalid_uri = "invalid uri";
221
222		// when
223		let fut = connect(invalid_uri);
224		let res: RpcResult<TestClient> = tokio::runtime::Runtime::new().unwrap().block_on(fut);
225
226		// then
227		assert_matches!(
228			res.map(|_cli| unreachable!()), Err(RpcError::Other(err)) => {
229				assert_eq!(format!("{}", err), "invalid uri character");
230			}
231		);
232	}
233
234	#[test]
235	fn handles_server_error() {
236		crate::logger::init_log();
237
238		// given
239		let server = TestServer::serve(id);
240
241		// when
242		let run = async {
243			let client: TestClient = connect(&server.uri).await?;
244			client.fail().await
245		};
246		let res = tokio::runtime::Runtime::new().unwrap().block_on(run);
247
248		// then
249		if let Err(RpcError::JsonRpcError(err)) = res {
250			assert_eq!(
251				err,
252				Error {
253					code: ErrorCode::ServerError(-34),
254					message: "Server error".into(),
255					data: None
256				}
257			)
258		} else {
259			panic!("Expected JsonRpcError. Received {:?}", res)
260		}
261	}
262
263	#[test]
264	fn handles_connection_refused_error() {
265		// given
266		let mut server = TestServer::serve(id);
267		// stop server so that we get a connection refused
268		server.stop();
269
270		let run = async {
271			let client: TestClient = connect(&server.uri).await?;
272			let res = client.hello("http").await;
273
274			if let Err(RpcError::Other(err)) = res {
275				if let Some(err) = err.downcast_ref::<hyper::Error>() {
276					assert!(err.is_connect(), "Expected Connection Error, got {:?}", err)
277				} else {
278					panic!("Expected a hyper::Error")
279				}
280			} else {
281				panic!("Expected JsonRpcError. Received {:?}", res)
282			}
283
284			Ok(()) as RpcResult<_>
285		};
286
287		tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap();
288	}
289}