jsonrpc_client_transports/transports/
http.rs1use super::RequestBuilder;
6use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
7use futures::{future, Future, FutureExt, StreamExt, TryFutureExt};
8use hyper::{http, Client, Request, Uri};
9
10pub async fn connect<TClient>(url: &str) -> RpcResult<TClient>
12where
13 TClient: From<RpcChannel>,
14{
15 let url: Uri = url.parse().map_err(|e| RpcError::Other(Box::new(e)))?;
16
17 let (client_api, client_worker) = do_connect(url).await;
18 tokio::spawn(client_worker);
19
20 Ok(TClient::from(client_api))
21}
22
23async fn do_connect(url: Uri) -> (RpcChannel, impl Future<Output = ()>) {
24 let max_parallel = 8;
25
26 #[cfg(feature = "tls")]
27 let connector = hyper_tls::HttpsConnector::new();
28 #[cfg(feature = "tls")]
29 let client = Client::builder().build::<_, hyper::Body>(connector);
30
31 #[cfg(not(feature = "tls"))]
32 let client = Client::new();
33 let mut request_builder = RequestBuilder::new();
35
36 let (sender, receiver) = futures::channel::mpsc::unbounded();
37
38 let fut = receiver
39 .filter_map(move |msg: RpcMessage| {
40 future::ready(match msg {
41 RpcMessage::Call(call) => {
42 let (_, request) = request_builder.call_request(&call);
43 Some((request, Some(call.sender)))
44 }
45 RpcMessage::Notify(notify) => Some((request_builder.notification(¬ify), None)),
46 RpcMessage::Subscribe(_) => {
47 log::warn!("Unsupported `RpcMessage` type `Subscribe`.");
48 None
49 }
50 })
51 })
52 .map(move |(request, sender)| {
53 let request = Request::post(&url)
54 .header(
55 http::header::CONTENT_TYPE,
56 http::header::HeaderValue::from_static("application/json"),
57 )
58 .header(
59 http::header::ACCEPT,
60 http::header::HeaderValue::from_static("application/json"),
61 )
62 .body(request.into())
63 .expect("Uri and request headers are valid; qed");
64
65 client
66 .request(request)
67 .then(|response| async move { (response, sender) })
68 })
69 .buffer_unordered(max_parallel)
70 .for_each(|(response, sender)| async {
71 let result = match response {
72 Ok(ref res) if !res.status().is_success() => {
73 log::trace!("http result status {}", res.status());
74 Err(RpcError::Client(format!(
75 "Unexpected response status code: {}",
76 res.status()
77 )))
78 }
79 Err(err) => Err(RpcError::Other(Box::new(err))),
80 Ok(res) => {
81 hyper::body::to_bytes(res.into_body())
82 .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
83 .await
84 }
85 };
86
87 if let Some(sender) = sender {
88 let response = result
89 .and_then(|response| {
90 let response_str = String::from_utf8_lossy(response.as_ref()).into_owned();
91 super::parse_response(&response_str)
92 })
93 .and_then(|r| r.1);
94 if let Err(err) = sender.send(response) {
95 log::warn!("Error resuming asynchronous request: {:?}", err);
96 }
97 }
98 });
99
100 (sender.into(), fut)
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::*;
107 use assert_matches::assert_matches;
108 use jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value};
109 use jsonrpc_http_server::*;
110
111 fn id<T>(t: T) -> T {
112 t
113 }
114
115 struct TestServer {
116 uri: String,
117 server: Option<Server>,
118 }
119
120 impl TestServer {
121 fn serve<F: FnOnce(ServerBuilder) -> ServerBuilder>(alter: F) -> Self {
122 let builder = ServerBuilder::new(io()).rest_api(RestApi::Unsecure);
123
124 let server = alter(builder).start_http(&"127.0.0.1:0".parse().unwrap()).unwrap();
125 let uri = format!("http://{}", server.address());
126
127 TestServer {
128 uri,
129 server: Some(server),
130 }
131 }
132
133 fn stop(&mut self) {
134 let server = self.server.take();
135 if let Some(server) = server {
136 server.close();
137 }
138 }
139 }
140
141 fn io() -> IoHandler {
142 let mut io = IoHandler::default();
143 io.add_sync_method("hello", |params: Params| match params.parse::<(String,)>() {
144 Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))),
145 _ => Ok(Value::String("world".into())),
146 });
147 io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34))));
148 io.add_notification("notify", |params: Params| {
149 let (value,) = params.parse::<(u64,)>().expect("expected one u64 as param");
150 assert_eq!(value, 12);
151 });
152
153 io
154 }
155
156 #[derive(Clone)]
157 struct TestClient(TypedClient);
158
159 impl From<RpcChannel> for TestClient {
160 fn from(channel: RpcChannel) -> Self {
161 TestClient(channel.into())
162 }
163 }
164
165 impl TestClient {
166 fn hello(&self, msg: &'static str) -> impl Future<Output = RpcResult<String>> {
167 self.0.call_method("hello", "String", (msg,))
168 }
169 fn fail(&self) -> impl Future<Output = RpcResult<()>> {
170 self.0.call_method("fail", "()", ())
171 }
172 fn notify(&self, value: u64) -> RpcResult<()> {
173 self.0.notify("notify", (value,))
174 }
175 }
176
177 #[test]
178 fn should_work() {
179 crate::logger::init_log();
180
181 let server = TestServer::serve(id);
183
184 let run = async {
186 let client: TestClient = connect(&server.uri).await?;
187 let result = client.hello("http").await?;
188
189 assert_eq!("hello http", result);
191 Ok(()) as RpcResult<_>
192 };
193
194 tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap();
195 }
196
197 #[test]
198 fn should_send_notification() {
199 crate::logger::init_log();
200
201 let server = TestServer::serve(id);
203
204 let run = async {
206 let client: TestClient = connect(&server.uri).await.unwrap();
207 client.notify(12).unwrap();
208 };
209
210 tokio::runtime::Runtime::new().unwrap().block_on(run);
211 drop(server);
213 }
214
215 #[test]
216 fn handles_invalid_uri() {
217 crate::logger::init_log();
218
219 let invalid_uri = "invalid uri";
221
222 let fut = connect(invalid_uri);
224 let res: RpcResult<TestClient> = tokio::runtime::Runtime::new().unwrap().block_on(fut);
225
226 assert_matches!(
228 res.map(|_cli| unreachable!()), Err(RpcError::Other(err)) => {
229 assert_eq!(format!("{}", err), "invalid uri character");
230 }
231 );
232 }
233
234 #[test]
235 fn handles_server_error() {
236 crate::logger::init_log();
237
238 let server = TestServer::serve(id);
240
241 let run = async {
243 let client: TestClient = connect(&server.uri).await?;
244 client.fail().await
245 };
246 let res = tokio::runtime::Runtime::new().unwrap().block_on(run);
247
248 if let Err(RpcError::JsonRpcError(err)) = res {
250 assert_eq!(
251 err,
252 Error {
253 code: ErrorCode::ServerError(-34),
254 message: "Server error".into(),
255 data: None
256 }
257 )
258 } else {
259 panic!("Expected JsonRpcError. Received {:?}", res)
260 }
261 }
262
263 #[test]
264 fn handles_connection_refused_error() {
265 let mut server = TestServer::serve(id);
267 server.stop();
269
270 let run = async {
271 let client: TestClient = connect(&server.uri).await?;
272 let res = client.hello("http").await;
273
274 if let Err(RpcError::Other(err)) = res {
275 if let Some(err) = err.downcast_ref::<hyper::Error>() {
276 assert!(err.is_connect(), "Expected Connection Error, got {:?}", err)
277 } else {
278 panic!("Expected a hyper::Error")
279 }
280 } else {
281 panic!("Expected JsonRpcError. Received {:?}", res)
282 }
283
284 Ok(()) as RpcResult<_>
285 };
286
287 tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap();
288 }
289}