1#![deny(missing_docs)]
22
23use jsonrpc_server_utils as server_utils;
24
25pub use hyper;
26pub use jsonrpc_core;
27
28#[macro_use]
29extern crate log;
30
31mod handler;
32mod response;
33#[cfg(test)]
34mod tests;
35mod utils;
36
37use std::convert::Infallible;
38use std::future::Future;
39use std::io;
40use std::net::SocketAddr;
41use std::pin::Pin;
42use std::sync::{mpsc, Arc, Weak};
43use std::thread;
44
45use parking_lot::Mutex;
46
47use crate::jsonrpc::MetaIoHandler;
48use crate::server_utils::reactor::{Executor, UninitializedExecutor};
49use futures::{channel::oneshot, future};
50use hyper::Body;
51use jsonrpc_core as jsonrpc;
52
53pub use crate::handler::ServerHandler;
54pub use crate::response::Response;
55pub use crate::server_utils::cors::{self, AccessControlAllowOrigin, AllowCors, Origin};
56pub use crate::server_utils::hosts::{DomainsValidation, Host};
57pub use crate::server_utils::reactor::TaskExecutor;
58pub use crate::server_utils::{tokio, SuspendableStream};
59pub use crate::utils::{cors_allow_headers, cors_allow_origin, is_host_allowed};
60
61pub enum RequestMiddlewareAction {
63 Proceed {
65 should_continue_on_invalid_cors: bool,
68 request: hyper::Request<Body>,
70 },
71 Respond {
73 should_validate_hosts: bool,
75 response: Pin<Box<dyn Future<Output = hyper::Result<hyper::Response<Body>>> + Send>>,
77 },
78}
79
80impl From<Response> for RequestMiddlewareAction {
81 fn from(o: Response) -> Self {
82 RequestMiddlewareAction::Respond {
83 should_validate_hosts: true,
84 response: Box::pin(async { Ok(o.into()) }),
85 }
86 }
87}
88
89impl From<hyper::Response<Body>> for RequestMiddlewareAction {
90 fn from(response: hyper::Response<Body>) -> Self {
91 RequestMiddlewareAction::Respond {
92 should_validate_hosts: true,
93 response: Box::pin(async { Ok(response) }),
94 }
95 }
96}
97
98impl From<hyper::Request<Body>> for RequestMiddlewareAction {
99 fn from(request: hyper::Request<Body>) -> Self {
100 RequestMiddlewareAction::Proceed {
101 should_continue_on_invalid_cors: false,
102 request,
103 }
104 }
105}
106
107pub trait RequestMiddleware: Send + Sync + 'static {
109 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction;
111}
112
113impl<F> RequestMiddleware for F
114where
115 F: Fn(hyper::Request<Body>) -> RequestMiddlewareAction + Sync + Send + 'static,
116{
117 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
118 (*self)(request)
119 }
120}
121
122#[derive(Default)]
123struct NoopRequestMiddleware;
124impl RequestMiddleware for NoopRequestMiddleware {
125 fn on_request(&self, request: hyper::Request<Body>) -> RequestMiddlewareAction {
126 RequestMiddlewareAction::Proceed {
127 should_continue_on_invalid_cors: false,
128 request,
129 }
130 }
131}
132
133pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
135 fn read_metadata(&self, _: &hyper::Request<Body>) -> M;
137}
138
139impl<M, F> MetaExtractor<M> for F
140where
141 M: jsonrpc::Metadata,
142 F: Fn(&hyper::Request<Body>) -> M + Sync + Send + 'static,
143{
144 fn read_metadata(&self, req: &hyper::Request<Body>) -> M {
145 (*self)(req)
146 }
147}
148
149#[derive(Default)]
150struct NoopExtractor;
151impl<M: jsonrpc::Metadata + Default> MetaExtractor<M> for NoopExtractor {
152 fn read_metadata(&self, _: &hyper::Request<Body>) -> M {
153 M::default()
154 }
155}
156pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
159 pub handler: Arc<MetaIoHandler<M, S>>,
161 pub extractor: Arc<dyn MetaExtractor<M>>,
163}
164
165impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
166 fn clone(&self) -> Self {
167 Rpc {
168 handler: self.handler.clone(),
169 extractor: self.extractor.clone(),
170 }
171 }
172}
173
174impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Rpc<M, S> {
175 pub fn downgrade(&self) -> WeakRpc<M, S> {
179 WeakRpc {
180 handler: Arc::downgrade(&self.handler),
181 extractor: Arc::downgrade(&self.extractor),
182 }
183 }
184}
185pub struct WeakRpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
191 handler: Weak<MetaIoHandler<M, S>>,
192 extractor: Weak<dyn MetaExtractor<M>>,
193}
194
195impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for WeakRpc<M, S> {
196 fn clone(&self) -> Self {
197 WeakRpc {
198 handler: self.handler.clone(),
199 extractor: self.extractor.clone(),
200 }
201 }
202}
203
204impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> WeakRpc<M, S> {
205 pub fn upgrade(&self) -> Option<Rpc<M, S>> {
207 let handler = self.handler.upgrade()?;
208 let extractor = self.extractor.upgrade()?;
209
210 Some(Rpc { handler, extractor })
211 }
212}
213
214type AllowedHosts = Option<Vec<Host>>;
215type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
216
217#[derive(Debug, PartialEq, Clone, Copy)]
219pub enum RestApi {
220 Secure,
226 Unsecure,
231 Disabled,
233}
234
235pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
237 handler: Arc<MetaIoHandler<M, S>>,
238 executor: UninitializedExecutor,
239 meta_extractor: Arc<dyn MetaExtractor<M>>,
240 request_middleware: Arc<dyn RequestMiddleware>,
241 cors_domains: CorsDomains,
242 cors_max_age: Option<u32>,
243 allowed_headers: cors::AccessControlAllowHeaders,
244 allowed_hosts: AllowedHosts,
245 rest_api: RestApi,
246 health_api: Option<(String, String)>,
247 keep_alive: bool,
248 threads: usize,
249 max_request_body_size: usize,
250}
251
252impl<M: jsonrpc::Metadata + Default, S: jsonrpc::Middleware<M>> ServerBuilder<M, S>
253where
254 S::Future: Unpin,
255 S::CallFuture: Unpin,
256 M: Unpin,
257{
258 pub fn new<T>(handler: T) -> Self
264 where
265 T: Into<MetaIoHandler<M, S>>,
266 {
267 Self::with_meta_extractor(handler, NoopExtractor)
268 }
269}
270
271impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S>
272where
273 S::Future: Unpin,
274 S::CallFuture: Unpin,
275 M: Unpin,
276{
277 pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
283 where
284 T: Into<MetaIoHandler<M, S>>,
285 E: MetaExtractor<M>,
286 {
287 ServerBuilder {
288 handler: Arc::new(handler.into()),
289 executor: UninitializedExecutor::Unspawned,
290 meta_extractor: Arc::new(extractor),
291 request_middleware: Arc::new(NoopRequestMiddleware::default()),
292 cors_domains: None,
293 cors_max_age: None,
294 allowed_headers: cors::AccessControlAllowHeaders::Any,
295 allowed_hosts: None,
296 rest_api: RestApi::Disabled,
297 health_api: None,
298 keep_alive: true,
299 threads: 1,
300 max_request_body_size: 5 * 1024 * 1024,
301 }
302 }
303
304 pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self {
308 self.executor = UninitializedExecutor::Shared(executor);
309 self
310 }
311
312 pub fn rest_api(mut self, rest_api: RestApi) -> Self {
317 self.rest_api = rest_api;
318 self
319 }
320
321 pub fn health_api<A, B, T>(mut self, health_api: T) -> Self
329 where
330 T: Into<Option<(A, B)>>,
331 A: Into<String>,
332 B: Into<String>,
333 {
334 self.health_api = health_api.into().map(|(a, b)| (a.into(), b.into()));
335 self
336 }
337
338 pub fn keep_alive(mut self, val: bool) -> Self {
342 self.keep_alive = val;
343 self
344 }
345
346 #[cfg(not(unix))]
350 #[allow(unused_mut)]
351 pub fn threads(mut self, _threads: usize) -> Self {
352 warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
353 self
354 }
355
356 #[cfg(unix)]
366 pub fn threads(mut self, threads: usize) -> Self {
367 self.threads = threads;
368 self
369 }
370
371 pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
373 self.cors_domains = cors_domains.into();
374 self
375 }
376
377 pub fn cors_max_age<T: Into<Option<u32>>>(mut self, cors_max_age: T) -> Self {
382 self.cors_max_age = cors_max_age.into();
383 self
384 }
385
386 pub fn cors_allow_headers(mut self, allowed_headers: cors::AccessControlAllowHeaders) -> Self {
388 self.allowed_headers = allowed_headers;
389 self
390 }
391
392 pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
394 self.request_middleware = Arc::new(middleware);
395 self
396 }
397
398 pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
400 self.meta_extractor = Arc::new(extractor);
401 self
402 }
403
404 pub fn allow_only_bind_host(mut self) -> Self {
406 self.allowed_hosts = Some(Vec::new());
407 self
408 }
409
410 pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
412 self.allowed_hosts = allowed_hosts.into();
413 self
414 }
415
416 pub fn max_request_body_size(mut self, val: usize) -> Self {
418 self.max_request_body_size = val;
419 self
420 }
421
422 pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
424 let cors_domains = self.cors_domains;
425 let cors_max_age = self.cors_max_age;
426 let allowed_headers = self.allowed_headers;
427 let request_middleware = self.request_middleware;
428 let allowed_hosts = self.allowed_hosts;
429 let jsonrpc_handler = Rpc {
430 handler: self.handler,
431 extractor: self.meta_extractor,
432 };
433 let rest_api = self.rest_api;
434 let health_api = self.health_api;
435 let keep_alive = self.keep_alive;
436 let reuse_port = self.threads > 1;
437
438 let (local_addr_tx, local_addr_rx) = mpsc::channel();
439 let (close, shutdown_signal) = oneshot::channel();
440 let (done_tx, done_rx) = oneshot::channel();
441 let eloop = self.executor.init_with_name("http.worker0")?;
442 let req_max_size = self.max_request_body_size;
443 serve(
445 (shutdown_signal, local_addr_tx, done_tx),
446 eloop.executor(),
447 addr.to_owned(),
448 cors_domains.clone(),
449 cors_max_age,
450 allowed_headers.clone(),
451 request_middleware.clone(),
452 allowed_hosts.clone(),
453 jsonrpc_handler.clone(),
454 rest_api,
455 health_api.clone(),
456 keep_alive,
457 reuse_port,
458 req_max_size,
459 );
460 let handles = (0..self.threads - 1)
461 .map(|i| {
462 let (local_addr_tx, local_addr_rx) = mpsc::channel();
463 let (close, shutdown_signal) = oneshot::channel();
464 let (done_tx, done_rx) = oneshot::channel();
465 let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
466 serve(
467 (shutdown_signal, local_addr_tx, done_tx),
468 eloop.executor(),
469 addr.to_owned(),
470 cors_domains.clone(),
471 cors_max_age,
472 allowed_headers.clone(),
473 request_middleware.clone(),
474 allowed_hosts.clone(),
475 jsonrpc_handler.clone(),
476 rest_api,
477 health_api.clone(),
478 keep_alive,
479 reuse_port,
480 req_max_size,
481 );
482 Ok((eloop, close, local_addr_rx, done_rx))
483 })
484 .collect::<io::Result<Vec<_>>>()?;
485
486 let local_addr = recv_address(local_addr_rx);
488 let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
490 .into_iter()
491 .map(|(eloop, close, local_addr_rx, done_rx)| {
492 let _ = recv_address(local_addr_rx)?;
493 Ok((eloop, close, done_rx))
494 })
495 .collect::<io::Result<Vec<_>>>()?;
496 handles.push((eloop, close, done_rx));
497
498 let (executors, done_rxs) = handles
499 .into_iter()
500 .fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
501 acc.0.push((eloop, closer));
502 acc.1.push(done_rx);
503 acc
504 });
505
506 Ok(Server {
507 address: local_addr?,
508 executors: Arc::new(Mutex::new(Some(executors))),
509 done: Some(done_rxs),
510 })
511 }
512}
513
514fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
515 local_addr_rx
516 .recv()
517 .map_err(|_| io::Error::new(io::ErrorKind::Interrupted, ""))?
518}
519
520fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
521 signals: (
522 oneshot::Receiver<()>,
523 mpsc::Sender<io::Result<SocketAddr>>,
524 oneshot::Sender<()>,
525 ),
526 executor: TaskExecutor,
527 addr: SocketAddr,
528 cors_domains: CorsDomains,
529 cors_max_age: Option<u32>,
530 allowed_headers: cors::AccessControlAllowHeaders,
531 request_middleware: Arc<dyn RequestMiddleware>,
532 allowed_hosts: AllowedHosts,
533 jsonrpc_handler: Rpc<M, S>,
534 rest_api: RestApi,
535 health_api: Option<(String, String)>,
536 keep_alive: bool,
537 reuse_port: bool,
538 max_request_body_size: usize,
539) where
540 S::Future: Unpin,
541 S::CallFuture: Unpin,
542 M: Unpin,
543{
544 let (shutdown_signal, local_addr_tx, done_tx) = signals;
545 executor.spawn(async move {
546 let bind = move || {
547 let listener = match addr {
548 SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
549 SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
550 };
551 configure_port(reuse_port, &listener)?;
552 listener.reuse_address(true)?;
553 listener.bind(&addr)?;
554 let listener = listener.listen(1024)?;
555 let local_addr = listener.local_addr()?;
556
557 listener.set_nonblocking(true)?;
561 #[cfg(windows)]
563 let raw_socket = std::os::windows::io::AsRawSocket::as_raw_socket(&listener);
564 #[cfg(not(windows))]
565 let raw_socket = ();
566
567 let server_builder =
568 hyper::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
569 Ok((server_builder, local_addr, raw_socket))
573 };
574
575 let bind_result = match bind() {
576 Ok((server_builder, local_addr, raw_socket)) => {
577 match local_addr_tx.send(Ok(local_addr)) {
579 Ok(_) => Ok((server_builder, local_addr, raw_socket)),
580 Err(_) => {
581 warn!(
582 "Thread {:?} unable to reach receiver, closing server",
583 thread::current().name()
584 );
585 Err(())
586 }
587 }
588 }
589 Err(err) => {
590 let _send_result = local_addr_tx.send(Err(err));
592
593 Err(())
594 }
595 };
596
597 let (server_builder, local_addr, _raw_socket) = bind_result?;
598
599 let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
600
601 let server_builder = server_builder
602 .http1_keepalive(keep_alive)
603 .tcp_nodelay(true)
604 .tcp_sleep_on_accept_errors(true);
607
608 let service_fn = hyper::service::make_service_fn(move |_addr_stream| {
609 let service = ServerHandler::new(
610 jsonrpc_handler.downgrade(),
611 cors_domains.clone(),
612 cors_max_age,
613 allowed_headers.clone(),
614 allowed_hosts.clone(),
615 request_middleware.clone(),
616 rest_api,
617 health_api.clone(),
618 max_request_body_size,
619 keep_alive,
620 );
621 async { Ok::<_, Infallible>(service) }
622 });
623
624 let server = server_builder.serve(service_fn).with_graceful_shutdown(async {
625 if let Err(err) = shutdown_signal.await {
626 debug!("Shutdown signaller dropped, closing server: {:?}", err);
627 }
628 });
629
630 if let Err(err) = server.await {
631 error!("Error running HTTP server: {:?}", err);
632 }
633
634 #[cfg(windows)]
639 let _: std::net::TcpListener = unsafe { std::os::windows::io::FromRawSocket::from_raw_socket(_raw_socket) };
640
641 done_tx.send(())
642 });
643}
644
645#[cfg(unix)]
646fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
647 use net2::unix::*;
648
649 if reuse {
650 tcp.reuse_port(true)?;
651 }
652
653 Ok(())
654}
655
656#[cfg(not(unix))]
657fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
658 Ok(())
659}
660
661#[derive(Clone)]
665pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);
666
667impl CloseHandle {
668 pub fn close(self) {
670 if let Some(executors) = self.0.lock().take() {
671 for (executor, closer) in executors {
672 let _ = closer.send(());
674 executor.close();
675 }
676 }
677 }
678}
679
680type Executors = Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>;
681pub struct Server {
683 address: SocketAddr,
684 executors: Executors,
685 done: Option<Vec<oneshot::Receiver<()>>>,
686}
687
688impl Server {
689 pub fn address(&self) -> &SocketAddr {
691 &self.address
692 }
693
694 pub fn close(self) {
696 self.close_handle().close()
697 }
698
699 pub fn wait(mut self) {
701 self.wait_internal();
702 }
703
704 pub fn close_handle(&self) -> CloseHandle {
707 CloseHandle(self.executors.clone())
708 }
709
710 fn wait_internal(&mut self) {
711 if let Some(receivers) = self.done.take() {
712 let _ = std::thread::spawn(move || futures::executor::block_on(future::try_join_all(receivers))).join();
715 }
716 }
717}
718
719impl Drop for Server {
720 fn drop(&mut self) {
721 self.close_handle().close();
722 self.wait_internal();
723 }
724}