jsonrpc_server_utils/
reactor.rs

1//! Event Loop Executor
2//!
3//! Either spawns a new event loop, or re-uses provided one.
4//! Spawned event loop is always single threaded (mostly for
5//! historical/backward compatibility reasons) despite the fact
6//! that `tokio::runtime` can be multi-threaded.
7
8use std::io;
9
10use tokio::runtime;
11/// Task executor for Tokio 0.2 runtime.
12pub type TaskExecutor = tokio::runtime::Handle;
13
14/// Possibly uninitialized event loop executor.
15#[derive(Debug)]
16pub enum UninitializedExecutor {
17	/// Shared instance of executor.
18	Shared(TaskExecutor),
19	/// Event Loop should be spawned by the transport.
20	Unspawned,
21}
22
23impl UninitializedExecutor {
24	/// Initializes executor.
25	/// In case there is no shared executor, will spawn a new event loop.
26	/// Dropping `Executor` closes the loop.
27	pub fn initialize(self) -> io::Result<Executor> {
28		self.init_with_name("event.loop")
29	}
30
31	/// Initializes executor.
32	/// In case there is no shared executor, will spawn a new event loop.
33	/// Dropping `Executor` closes the loop.
34	pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
35		match self {
36			UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
37			UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
38		}
39	}
40}
41
42/// Initialized Executor
43#[derive(Debug)]
44pub enum Executor {
45	/// Shared instance
46	Shared(TaskExecutor),
47	/// Spawned Event Loop
48	Spawned(RpcEventLoop),
49}
50
51impl Executor {
52	/// Get tokio executor associated with this event loop.
53	pub fn executor(&self) -> TaskExecutor {
54		match self {
55			Executor::Shared(ref executor) => executor.clone(),
56			Executor::Spawned(ref eloop) => eloop.executor(),
57		}
58	}
59
60	/// Closes underlying event loop (if any!).
61	pub fn close(self) {
62		if let Executor::Spawned(eloop) = self {
63			eloop.close()
64		}
65	}
66
67	/// Wait for underlying event loop to finish (if any!).
68	pub fn wait(self) {
69		if let Executor::Spawned(eloop) = self {
70			let _ = eloop.wait();
71		}
72	}
73}
74
75/// A handle to running event loop. Dropping the handle will cause event loop to finish.
76#[derive(Debug)]
77pub struct RpcEventLoop {
78	executor: TaskExecutor,
79	close: Option<futures::channel::oneshot::Sender<()>>,
80	runtime: Option<runtime::Runtime>,
81}
82
83impl Drop for RpcEventLoop {
84	fn drop(&mut self) {
85		self.close.take().map(|v| v.send(()));
86	}
87}
88
89impl RpcEventLoop {
90	/// Spawns a new thread with the `EventLoop`.
91	pub fn spawn() -> io::Result<Self> {
92		RpcEventLoop::with_name(None)
93	}
94
95	/// Spawns a new named thread with the `EventLoop`.
96	pub fn with_name(name: Option<String>) -> io::Result<Self> {
97		let (stop, stopped) = futures::channel::oneshot::channel();
98
99		let mut tb = runtime::Builder::new_multi_thread();
100		tb.worker_threads(1);
101		tb.enable_all();
102
103		if let Some(name) = name {
104			tb.thread_name(name);
105		}
106
107		let runtime = tb.build()?;
108		let executor = runtime.handle().to_owned();
109
110		runtime.spawn(async {
111			let _ = stopped.await;
112		});
113
114		Ok(RpcEventLoop {
115			executor,
116			close: Some(stop),
117			runtime: Some(runtime),
118		})
119	}
120
121	/// Get executor for this event loop.
122	pub fn executor(&self) -> runtime::Handle {
123		self.runtime
124			.as_ref()
125			.expect("Runtime is only None if we're being dropped; qed")
126			.handle()
127			.clone()
128	}
129
130	/// Blocks current thread and waits until the event loop is finished.
131	pub fn wait(mut self) -> Result<(), ()> {
132		// Dropping Tokio 0.2 runtime waits for all spawned tasks to terminate
133		let runtime = self.runtime.take().ok_or(())?;
134		drop(runtime);
135		Ok(())
136	}
137
138	/// Finishes this event loop.
139	pub fn close(mut self) {
140		let _ = self
141			.close
142			.take()
143			.expect("Close is always set before self is consumed.")
144			.send(())
145			.map_err(|e| {
146				warn!("Event Loop is already finished. {:?}", e);
147			});
148	}
149}
150
151#[cfg(test)]
152mod tests {
153	use super::*;
154
155	#[test]
156	fn make_sure_rpc_event_loop_is_send_and_sync() {
157		fn is_send_and_sync<T: Send + Sync>() {}
158
159		is_send_and_sync::<RpcEventLoop>();
160	}
161}