jsonrpc_server_utils/
reactor.rs1use std::io;
9
10use tokio::runtime;
11pub type TaskExecutor = tokio::runtime::Handle;
13
14#[derive(Debug)]
16pub enum UninitializedExecutor {
17 Shared(TaskExecutor),
19 Unspawned,
21}
22
23impl UninitializedExecutor {
24 pub fn initialize(self) -> io::Result<Executor> {
28 self.init_with_name("event.loop")
29 }
30
31 pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
35 match self {
36 UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
37 UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
38 }
39 }
40}
41
42#[derive(Debug)]
44pub enum Executor {
45 Shared(TaskExecutor),
47 Spawned(RpcEventLoop),
49}
50
51impl Executor {
52 pub fn executor(&self) -> TaskExecutor {
54 match self {
55 Executor::Shared(ref executor) => executor.clone(),
56 Executor::Spawned(ref eloop) => eloop.executor(),
57 }
58 }
59
60 pub fn close(self) {
62 if let Executor::Spawned(eloop) = self {
63 eloop.close()
64 }
65 }
66
67 pub fn wait(self) {
69 if let Executor::Spawned(eloop) = self {
70 let _ = eloop.wait();
71 }
72 }
73}
74
75#[derive(Debug)]
77pub struct RpcEventLoop {
78 executor: TaskExecutor,
79 close: Option<futures::channel::oneshot::Sender<()>>,
80 runtime: Option<runtime::Runtime>,
81}
82
83impl Drop for RpcEventLoop {
84 fn drop(&mut self) {
85 self.close.take().map(|v| v.send(()));
86 }
87}
88
89impl RpcEventLoop {
90 pub fn spawn() -> io::Result<Self> {
92 RpcEventLoop::with_name(None)
93 }
94
95 pub fn with_name(name: Option<String>) -> io::Result<Self> {
97 let (stop, stopped) = futures::channel::oneshot::channel();
98
99 let mut tb = runtime::Builder::new_multi_thread();
100 tb.worker_threads(1);
101 tb.enable_all();
102
103 if let Some(name) = name {
104 tb.thread_name(name);
105 }
106
107 let runtime = tb.build()?;
108 let executor = runtime.handle().to_owned();
109
110 runtime.spawn(async {
111 let _ = stopped.await;
112 });
113
114 Ok(RpcEventLoop {
115 executor,
116 close: Some(stop),
117 runtime: Some(runtime),
118 })
119 }
120
121 pub fn executor(&self) -> runtime::Handle {
123 self.runtime
124 .as_ref()
125 .expect("Runtime is only None if we're being dropped; qed")
126 .handle()
127 .clone()
128 }
129
130 pub fn wait(mut self) -> Result<(), ()> {
132 let runtime = self.runtime.take().ok_or(())?;
134 drop(runtime);
135 Ok(())
136 }
137
138 pub fn close(mut self) {
140 let _ = self
141 .close
142 .take()
143 .expect("Close is always set before self is consumed.")
144 .send(())
145 .map_err(|e| {
146 warn!("Event Loop is already finished. {:?}", e);
147 });
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[test]
156 fn make_sure_rpc_event_loop_is_send_and_sync() {
157 fn is_send_and_sync<T: Send + Sync>() {}
158
159 is_send_and_sync::<RpcEventLoop>();
160 }
161}