jsonrpc_http_server/
lib.rs

1//! jsonrpc http server.
2//!
3//! ```no_run
4//! use jsonrpc_core::*;
5//! use jsonrpc_http_server::*;
6//!
7//! fn main() {
8//!     let mut io = IoHandler::new();
9//!     io.add_sync_method("say_hello", |_: Params| {
10//!         Ok(Value::String("hello".to_string()))
11//!     });
12//!
13//!     let _server = ServerBuilder::new(io)
14//!     .start_http(&"127.0.0.1:3030".parse().unwrap())
15//!     .expect("Unable to start RPC server");
16//!
17//! _server.wait();
18//! }
19//! ```
20
21#![deny(missing_docs)]
22
23use jsonrpc_server_utils as server_utils;
24
25pub use hyper;
26pub use jsonrpc_core;
27
28#[macro_use]
29extern crate log;
30
31mod handler;
32mod response;
33#[cfg(test)]
34mod tests;
35mod utils;
36
37use std::convert::Infallible;
38use std::future::Future;
39use std::io;
40use std::net::SocketAddr;
41use std::pin::Pin;
42use std::sync::{mpsc, Arc, Weak};
43use std::thread;
44
45use parking_lot::Mutex;
46
47use crate::jsonrpc::MetaIoHandler;
48use crate::server_utils::reactor::{Executor, UninitializedExecutor};
49use futures::{channel::oneshot, future};
50use hyper::Body;
51use jsonrpc_core as jsonrpc;
52
53pub use crate::handler::ServerHandler;
54pub use crate::response::Response;
55pub use crate::server_utils::cors::{self, AccessControlAllowOrigin, AllowCors, Origin};
56pub use crate::server_utils::hosts::{DomainsValidation, Host};
57pub use crate::server_utils::reactor::TaskExecutor;
58pub use crate::server_utils::{tokio, SuspendableStream};
59pub use crate::utils::{cors_allow_headers, cors_allow_origin, is_host_allowed};
60
61/// Action undertaken by a middleware.
62pub enum RequestMiddlewareAction {
63	/// Proceed with standard RPC handling
64	Proceed {
65		/// Should the request be processed even if invalid CORS headers are detected?
66		/// This allows for side effects to take place.
67		should_continue_on_invalid_cors: bool,
68		/// The request object returned
69		request: hyper::Request<Body>,
70	},
71	/// Intercept the request and respond differently.
72	Respond {
73		/// Should standard hosts validation be performed?
74		should_validate_hosts: bool,
75		/// a future for server response
76		response: Pin<Box<dyn Future<Output = hyper::Result<hyper::Response<Body>>> + Send>>,
77	},
78}
79
80impl From<Response> for RequestMiddlewareAction {
81	fn from(o: Response) -> Self {
82		RequestMiddlewareAction::Respond {
83			should_validate_hosts: true,
84			response: Box::pin(async { Ok(o.into()) }),
85		}
86	}
87}
88
89impl From<hyper::Response<Body>> for RequestMiddlewareAction {
90	fn from(response: hyper::Response<Body>) -> Self {
91		RequestMiddlewareAction::Respond {
92			should_validate_hosts: true,
93			response: Box::pin(async { Ok(response) }),
94		}
95	}
96}
97
98impl From<hyper::Request<Body>> for RequestMiddlewareAction {
99	fn from(request: hyper::Request<Body>) -> Self {
100		RequestMiddlewareAction::Proceed {
101			should_continue_on_invalid_cors: false,
102			request,
103		}
104	}
105}
106
107/// Allows to intercept request and handle it differently.
108pub trait RequestMiddleware: Send + Sync + 'static {
109	/// Takes a request and decides how to proceed with it.
110	fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction;
111}
112
113impl<F> RequestMiddleware for F
114where
115	F: Fn(hyper::Request<Body>) -> RequestMiddlewareAction + Sync + Send + 'static,
116{
117	fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
118		(*self)(request)
119	}
120}
121
122#[derive(Default)]
123struct NoopRequestMiddleware;
124impl RequestMiddleware for NoopRequestMiddleware {
125	fn on_request(&self, request: hyper::Request<Body>) -> RequestMiddlewareAction {
126		RequestMiddlewareAction::Proceed {
127			should_continue_on_invalid_cors: false,
128			request,
129		}
130	}
131}
132
133/// Extracts metadata from the HTTP request.
134pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
135	/// Read the metadata from the request
136	fn read_metadata(&self, _: &hyper::Request<Body>) -> M;
137}
138
139impl<M, F> MetaExtractor<M> for F
140where
141	M: jsonrpc::Metadata,
142	F: Fn(&hyper::Request<Body>) -> M + Sync + Send + 'static,
143{
144	fn read_metadata(&self, req: &hyper::Request<Body>) -> M {
145		(*self)(req)
146	}
147}
148
149#[derive(Default)]
150struct NoopExtractor;
151impl<M: jsonrpc::Metadata + Default> MetaExtractor<M> for NoopExtractor {
152	fn read_metadata(&self, _: &hyper::Request<Body>) -> M {
153		M::default()
154	}
155}
156//
157/// RPC Handler bundled with metadata extractor.
158pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
159	/// RPC Handler
160	pub handler: Arc<MetaIoHandler<M, S>>,
161	/// Metadata extractor
162	pub extractor: Arc<dyn MetaExtractor<M>>,
163}
164
165impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
166	fn clone(&self) -> Self {
167		Rpc {
168			handler: self.handler.clone(),
169			extractor: self.extractor.clone(),
170		}
171	}
172}
173
174impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Rpc<M, S> {
175	/// Downgrade the `Rpc` to `WeakRpc`.
176	///
177	/// Downgrades internal `Arc`s to `Weak` references.
178	pub fn downgrade(&self) -> WeakRpc<M, S> {
179		WeakRpc {
180			handler: Arc::downgrade(&self.handler),
181			extractor: Arc::downgrade(&self.extractor),
182		}
183	}
184}
185/// A weak handle to the RPC server.
186///
187/// Since request handling futures are spawned directly on the executor,
188/// whenever the server is closed we want to make sure that existing
189/// tasks are not blocking the server and are dropped as soon as the server stops.
190pub struct WeakRpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
191	handler: Weak<MetaIoHandler<M, S>>,
192	extractor: Weak<dyn MetaExtractor<M>>,
193}
194
195impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for WeakRpc<M, S> {
196	fn clone(&self) -> Self {
197		WeakRpc {
198			handler: self.handler.clone(),
199			extractor: self.extractor.clone(),
200		}
201	}
202}
203
204impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> WeakRpc<M, S> {
205	/// Upgrade the handle to a strong one (`Rpc`) if  possible.
206	pub fn upgrade(&self) -> Option<Rpc<M, S>> {
207		let handler = self.handler.upgrade()?;
208		let extractor = self.extractor.upgrade()?;
209
210		Some(Rpc { handler, extractor })
211	}
212}
213
214type AllowedHosts = Option<Vec<Host>>;
215type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
216
217/// REST -> RPC converter state.
218#[derive(Debug, PartialEq, Clone, Copy)]
219pub enum RestApi {
220	/// The REST -> RPC converter is enabled
221	/// and requires `Content-Type: application/json` header
222	/// (even though the body should be empty).
223	/// This protects from submitting an RPC call
224	/// from unwanted origins.
225	Secure,
226	/// The REST -> RPC converter is enabled
227	/// and does not require any `Content-Type` headers.
228	/// NOTE: This allows sending RPCs via HTTP forms
229	/// from any website.
230	Unsecure,
231	/// The REST -> RPC converter is disabled.
232	Disabled,
233}
234
235/// Convenient JSON-RPC HTTP Server builder.
236pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
237	handler: Arc<MetaIoHandler<M, S>>,
238	executor: UninitializedExecutor,
239	meta_extractor: Arc<dyn MetaExtractor<M>>,
240	request_middleware: Arc<dyn RequestMiddleware>,
241	cors_domains: CorsDomains,
242	cors_max_age: Option<u32>,
243	allowed_headers: cors::AccessControlAllowHeaders,
244	allowed_hosts: AllowedHosts,
245	rest_api: RestApi,
246	health_api: Option<(String, String)>,
247	keep_alive: bool,
248	threads: usize,
249	max_request_body_size: usize,
250}
251
252impl<M: jsonrpc::Metadata + Default, S: jsonrpc::Middleware<M>> ServerBuilder<M, S>
253where
254	S::Future: Unpin,
255	S::CallFuture: Unpin,
256	M: Unpin,
257{
258	/// Creates new `ServerBuilder` for given `IoHandler`.
259	///
260	/// By default:
261	/// 1. Server is not sending any CORS headers.
262	/// 2. Server is validating `Host` header.
263	pub fn new<T>(handler: T) -> Self
264	where
265		T: Into<MetaIoHandler<M, S>>,
266	{
267		Self::with_meta_extractor(handler, NoopExtractor)
268	}
269}
270
271impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S>
272where
273	S::Future: Unpin,
274	S::CallFuture: Unpin,
275	M: Unpin,
276{
277	/// Creates new `ServerBuilder` for given `IoHandler`.
278	///
279	/// By default:
280	/// 1. Server is not sending any CORS headers.
281	/// 2. Server is validating `Host` header.
282	pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
283	where
284		T: Into<MetaIoHandler<M, S>>,
285		E: MetaExtractor<M>,
286	{
287		ServerBuilder {
288			handler: Arc::new(handler.into()),
289			executor: UninitializedExecutor::Unspawned,
290			meta_extractor: Arc::new(extractor),
291			request_middleware: Arc::new(NoopRequestMiddleware::default()),
292			cors_domains: None,
293			cors_max_age: None,
294			allowed_headers: cors::AccessControlAllowHeaders::Any,
295			allowed_hosts: None,
296			rest_api: RestApi::Disabled,
297			health_api: None,
298			keep_alive: true,
299			threads: 1,
300			max_request_body_size: 5 * 1024 * 1024,
301		}
302	}
303
304	/// Utilize existing event loop executor to poll RPC results.
305	///
306	/// Applies only to 1 of the threads. Other threads will spawn their own Event Loops.
307	pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self {
308		self.executor = UninitializedExecutor::Shared(executor);
309		self
310	}
311
312	/// Enable the REST -> RPC converter.
313	///
314	/// Allows you to invoke RPCs by sending `POST /<method>/<param1>/<param2>` requests
315	/// (with no body). Disabled by default.
316	pub fn rest_api(mut self, rest_api: RestApi) -> Self {
317		self.rest_api = rest_api;
318		self
319	}
320
321	/// Enable health endpoint.
322	///
323	/// Allows you to expose one of the methods under `GET /<path>`
324	/// The method will be invoked with no parameters.
325	/// Error returned from the method will be converted to status `500` response.
326	///
327	/// Expects a tuple with `(<path>, <rpc-method-name>)`.
328	pub fn health_api<A, B, T>(mut self, health_api: T) -> Self
329	where
330		T: Into<Option<(A, B)>>,
331		A: Into<String>,
332		B: Into<String>,
333	{
334		self.health_api = health_api.into().map(|(a, b)| (a.into(), b.into()));
335		self
336	}
337
338	/// Enables or disables HTTP keep-alive.
339	///
340	/// Default is true.
341	pub fn keep_alive(mut self, val: bool) -> Self {
342		self.keep_alive = val;
343		self
344	}
345
346	/// Sets number of threads of the server to run.
347	///
348	/// Panics when set to `0`.
349	#[cfg(not(unix))]
350	#[allow(unused_mut)]
351	pub fn threads(mut self, _threads: usize) -> Self {
352		warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
353		self
354	}
355
356	/// Sets number of threads of the server to run.
357	///
358	/// Panics when set to `0`.
359	/// The first thread will use provided `Executor` instance
360	/// and all other threads will use `UninitializedExecutor` to spawn
361	/// a new runtime for futures.
362	/// So it's also possible to run a multi-threaded server by
363	/// passing the default `tokio::runtime` executor to this builder
364	/// and setting `threads` to 1.
365	#[cfg(unix)]
366	pub fn threads(mut self, threads: usize) -> Self {
367		self.threads = threads;
368		self
369	}
370
371	/// Configures a list of allowed CORS origins.
372	pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
373		self.cors_domains = cors_domains.into();
374		self
375	}
376
377	/// Configure CORS `AccessControlMaxAge` header returned.
378	///
379	/// Informs the client that the CORS preflight request is not necessary for `cors_max_age` seconds.
380	/// Disabled by default.
381	pub fn cors_max_age<T: Into<Option<u32>>>(mut self, cors_max_age: T) -> Self {
382		self.cors_max_age = cors_max_age.into();
383		self
384	}
385
386	/// Configure the CORS `AccessControlAllowHeaders` header which are allowed.
387	pub fn cors_allow_headers(mut self, allowed_headers: cors::AccessControlAllowHeaders) -> Self {
388		self.allowed_headers = allowed_headers;
389		self
390	}
391
392	/// Configures request middleware
393	pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
394		self.request_middleware = Arc::new(middleware);
395		self
396	}
397
398	/// Configures metadata extractor
399	pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
400		self.meta_extractor = Arc::new(extractor);
401		self
402	}
403
404	/// Allow connections only with `Host` header set to binding address.
405	pub fn allow_only_bind_host(mut self) -> Self {
406		self.allowed_hosts = Some(Vec::new());
407		self
408	}
409
410	/// Specify a list of valid `Host` headers. Binding address is allowed automatically.
411	pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
412		self.allowed_hosts = allowed_hosts.into();
413		self
414	}
415
416	/// Sets the maximum size of a request body in bytes (default is 5 MiB).
417	pub fn max_request_body_size(mut self, val: usize) -> Self {
418		self.max_request_body_size = val;
419		self
420	}
421
422	/// Start this JSON-RPC HTTP server trying to bind to specified `SocketAddr`.
423	pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
424		let cors_domains = self.cors_domains;
425		let cors_max_age = self.cors_max_age;
426		let allowed_headers = self.allowed_headers;
427		let request_middleware = self.request_middleware;
428		let allowed_hosts = self.allowed_hosts;
429		let jsonrpc_handler = Rpc {
430			handler: self.handler,
431			extractor: self.meta_extractor,
432		};
433		let rest_api = self.rest_api;
434		let health_api = self.health_api;
435		let keep_alive = self.keep_alive;
436		let reuse_port = self.threads > 1;
437
438		let (local_addr_tx, local_addr_rx) = mpsc::channel();
439		let (close, shutdown_signal) = oneshot::channel();
440		let (done_tx, done_rx) = oneshot::channel();
441		let eloop = self.executor.init_with_name("http.worker0")?;
442		let req_max_size = self.max_request_body_size;
443		// The first threads `Executor` is initialised differently from the others
444		serve(
445			(shutdown_signal, local_addr_tx, done_tx),
446			eloop.executor(),
447			addr.to_owned(),
448			cors_domains.clone(),
449			cors_max_age,
450			allowed_headers.clone(),
451			request_middleware.clone(),
452			allowed_hosts.clone(),
453			jsonrpc_handler.clone(),
454			rest_api,
455			health_api.clone(),
456			keep_alive,
457			reuse_port,
458			req_max_size,
459		);
460		let handles = (0..self.threads - 1)
461			.map(|i| {
462				let (local_addr_tx, local_addr_rx) = mpsc::channel();
463				let (close, shutdown_signal) = oneshot::channel();
464				let (done_tx, done_rx) = oneshot::channel();
465				let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
466				serve(
467					(shutdown_signal, local_addr_tx, done_tx),
468					eloop.executor(),
469					addr.to_owned(),
470					cors_domains.clone(),
471					cors_max_age,
472					allowed_headers.clone(),
473					request_middleware.clone(),
474					allowed_hosts.clone(),
475					jsonrpc_handler.clone(),
476					rest_api,
477					health_api.clone(),
478					keep_alive,
479					reuse_port,
480					req_max_size,
481				);
482				Ok((eloop, close, local_addr_rx, done_rx))
483			})
484			.collect::<io::Result<Vec<_>>>()?;
485
486		// Wait for server initialization
487		let local_addr = recv_address(local_addr_rx);
488		// Wait for other threads as well.
489		let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
490			.into_iter()
491			.map(|(eloop, close, local_addr_rx, done_rx)| {
492				let _ = recv_address(local_addr_rx)?;
493				Ok((eloop, close, done_rx))
494			})
495			.collect::<io::Result<Vec<_>>>()?;
496		handles.push((eloop, close, done_rx));
497
498		let (executors, done_rxs) = handles
499			.into_iter()
500			.fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
501				acc.0.push((eloop, closer));
502				acc.1.push(done_rx);
503				acc
504			});
505
506		Ok(Server {
507			address: local_addr?,
508			executors: Arc::new(Mutex::new(Some(executors))),
509			done: Some(done_rxs),
510		})
511	}
512}
513
514fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
515	local_addr_rx
516		.recv()
517		.map_err(|_| io::Error::new(io::ErrorKind::Interrupted, ""))?
518}
519
520fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
521	signals: (
522		oneshot::Receiver<()>,
523		mpsc::Sender<io::Result<SocketAddr>>,
524		oneshot::Sender<()>,
525	),
526	executor: TaskExecutor,
527	addr: SocketAddr,
528	cors_domains: CorsDomains,
529	cors_max_age: Option<u32>,
530	allowed_headers: cors::AccessControlAllowHeaders,
531	request_middleware: Arc<dyn RequestMiddleware>,
532	allowed_hosts: AllowedHosts,
533	jsonrpc_handler: Rpc<M, S>,
534	rest_api: RestApi,
535	health_api: Option<(String, String)>,
536	keep_alive: bool,
537	reuse_port: bool,
538	max_request_body_size: usize,
539) where
540	S::Future: Unpin,
541	S::CallFuture: Unpin,
542	M: Unpin,
543{
544	let (shutdown_signal, local_addr_tx, done_tx) = signals;
545	executor.spawn(async move {
546		let bind = move || {
547			let listener = match addr {
548				SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
549				SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
550			};
551			configure_port(reuse_port, &listener)?;
552			listener.reuse_address(true)?;
553			listener.bind(&addr)?;
554			let listener = listener.listen(1024)?;
555			let local_addr = listener.local_addr()?;
556
557			// NOTE: Future-proof by explicitly setting the listener socket to
558			// non-blocking mode of operation (future Tokio/Hyper versions
559			// require for the callers to do that manually)
560			listener.set_nonblocking(true)?;
561			// HACK: See below.
562			#[cfg(windows)]
563			let raw_socket = std::os::windows::io::AsRawSocket::as_raw_socket(&listener);
564			#[cfg(not(windows))]
565			let raw_socket = ();
566
567			let server_builder =
568				hyper::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
569			// Add current host to allowed headers.
570			// NOTE: we need to use `l.local_addr()` instead of `addr`
571			// it might be different!
572			Ok((server_builder, local_addr, raw_socket))
573		};
574
575		let bind_result = match bind() {
576			Ok((server_builder, local_addr, raw_socket)) => {
577				// Send local address
578				match local_addr_tx.send(Ok(local_addr)) {
579					Ok(_) => Ok((server_builder, local_addr, raw_socket)),
580					Err(_) => {
581						warn!(
582							"Thread {:?} unable to reach receiver, closing server",
583							thread::current().name()
584						);
585						Err(())
586					}
587				}
588			}
589			Err(err) => {
590				// Send error
591				let _send_result = local_addr_tx.send(Err(err));
592
593				Err(())
594			}
595		};
596
597		let (server_builder, local_addr, _raw_socket) = bind_result?;
598
599		let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
600
601		let server_builder = server_builder
602			.http1_keepalive(keep_alive)
603			.tcp_nodelay(true)
604			// Explicitly attempt to recover from accept errors (e.g. too many
605			// files opened) instead of erroring out the entire server.
606			.tcp_sleep_on_accept_errors(true);
607
608		let service_fn = hyper::service::make_service_fn(move |_addr_stream| {
609			let service = ServerHandler::new(
610				jsonrpc_handler.downgrade(),
611				cors_domains.clone(),
612				cors_max_age,
613				allowed_headers.clone(),
614				allowed_hosts.clone(),
615				request_middleware.clone(),
616				rest_api,
617				health_api.clone(),
618				max_request_body_size,
619				keep_alive,
620			);
621			async { Ok::<_, Infallible>(service) }
622		});
623
624		let server = server_builder.serve(service_fn).with_graceful_shutdown(async {
625			if let Err(err) = shutdown_signal.await {
626				debug!("Shutdown signaller dropped, closing server: {:?}", err);
627			}
628		});
629
630		if let Err(err) = server.await {
631			error!("Error running HTTP server: {:?}", err);
632		}
633
634		// FIXME: Work around TCP listener socket not being properly closed
635		// in mio v0.6. This runs the std::net::TcpListener's destructor,
636		// which closes the underlying OS socket.
637		// Remove this once we migrate to Tokio 1.0.
638		#[cfg(windows)]
639		let _: std::net::TcpListener = unsafe { std::os::windows::io::FromRawSocket::from_raw_socket(_raw_socket) };
640
641		done_tx.send(())
642	});
643}
644
645#[cfg(unix)]
646fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
647	use net2::unix::*;
648
649	if reuse {
650		tcp.reuse_port(true)?;
651	}
652
653	Ok(())
654}
655
656#[cfg(not(unix))]
657fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
658	Ok(())
659}
660
661/// Handle used to close the server. Can be cloned and passed around to different threads and be used
662/// to close a server that is `wait()`ing.
663
664#[derive(Clone)]
665pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);
666
667impl CloseHandle {
668	/// Shutdown a running server
669	pub fn close(self) {
670		if let Some(executors) = self.0.lock().take() {
671			for (executor, closer) in executors {
672				// First send shutdown signal so we can proceed with underlying select
673				let _ = closer.send(());
674				executor.close();
675			}
676		}
677	}
678}
679
680type Executors = Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>;
681/// jsonrpc http server instance
682pub struct Server {
683	address: SocketAddr,
684	executors: Executors,
685	done: Option<Vec<oneshot::Receiver<()>>>,
686}
687
688impl Server {
689	/// Returns address of this server
690	pub fn address(&self) -> &SocketAddr {
691		&self.address
692	}
693
694	/// Closes the server.
695	pub fn close(self) {
696		self.close_handle().close()
697	}
698
699	/// Will block, waiting for the server to finish.
700	pub fn wait(mut self) {
701		self.wait_internal();
702	}
703
704	/// Get a handle that allows us to close the server from a different thread and/or while the
705	/// server is `wait()`ing.
706	pub fn close_handle(&self) -> CloseHandle {
707		CloseHandle(self.executors.clone())
708	}
709
710	fn wait_internal(&mut self) {
711		if let Some(receivers) = self.done.take() {
712			// NOTE: Gracefully handle the case where we may wait on a *nested*
713			// local task pool (for now, wait on a dedicated, spawned thread)
714			let _ = std::thread::spawn(move || futures::executor::block_on(future::try_join_all(receivers))).join();
715		}
716	}
717}
718
719impl Drop for Server {
720	fn drop(&mut self) {
721		self.close_handle().close();
722		self.wait_internal();
723	}
724}