tokio_reactor/
background.rs1use {AtomicTask, Handle, Reactor};
2
3use futures::{task, Async, Future, Poll};
4
5use std::io;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering::SeqCst;
8use std::sync::Arc;
9use std::thread;
10
11#[derive(Debug)]
17pub struct Background {
18 inner: Option<Inner>,
20}
21
22#[derive(Debug)]
24pub struct Shutdown {
25 inner: Inner,
26}
27
28#[derive(Debug)]
30struct Inner {
31 handle: Handle,
33
34 shared: Arc<Shared>,
36}
37
38#[derive(Debug)]
39struct Shared {
40 shutdown: AtomicUsize,
42
43 shutdown_task: AtomicTask,
45}
46
47const SHUTDOWN_IDLE: usize = 1;
49
50const SHUTDOWN_NOW: usize = 2;
52
53const SHUTDOWN: usize = 3;
55
56impl Background {
59 pub(crate) fn new(reactor: Reactor) -> io::Result<Background> {
61 let handle = reactor.handle().clone();
63
64 let shared = Arc::new(Shared {
67 shutdown: AtomicUsize::new(0),
68 shutdown_task: AtomicTask::new(),
69 });
70
71 let shared2 = shared.clone();
73
74 thread::Builder::new().spawn(move || run(reactor, shared2))?;
76
77 Ok(Background {
78 inner: Some(Inner { handle, shared }),
79 })
80 }
81
82 pub fn handle(&self) -> &Handle {
84 &self.inner.as_ref().unwrap().handle
85 }
86
87 pub fn shutdown_on_idle(mut self) -> Shutdown {
91 let inner = self.inner.take().unwrap();
92 inner.shutdown_on_idle();
93
94 Shutdown { inner }
95 }
96
97 pub fn shutdown_now(mut self) -> Shutdown {
101 let inner = self.inner.take().unwrap();
102 inner.shutdown_now();
103
104 Shutdown { inner }
105 }
106
107 pub fn forget(mut self) {
109 drop(self.inner.take());
110 }
111}
112
113impl Drop for Background {
114 fn drop(&mut self) {
115 let inner = match self.inner.take() {
116 Some(i) => i,
117 None => return,
118 };
119
120 inner.shutdown_now();
121
122 let shutdown = Shutdown { inner };
123 let _ = shutdown.wait();
124 }
125}
126
127impl Future for Shutdown {
130 type Item = ();
131 type Error = ();
132
133 fn poll(&mut self) -> Poll<(), ()> {
134 let task = task::current();
135 self.inner.shared.shutdown_task.register_task(task);
136
137 if !self.inner.is_shutdown() {
138 return Ok(Async::NotReady);
139 }
140
141 Ok(().into())
142 }
143}
144
145impl Inner {
148 fn is_shutdown(&self) -> bool {
150 self.shared.shutdown.load(SeqCst) == SHUTDOWN
151 }
152
153 fn shutdown_on_idle(&self) {
156 self.shared
157 .shutdown
158 .compare_and_swap(0, SHUTDOWN_IDLE, SeqCst);
159 self.handle.wakeup();
160 }
161
162 fn shutdown_now(&self) {
164 let mut curr = self.shared.shutdown.load(SeqCst);
165
166 loop {
167 if curr >= SHUTDOWN_NOW {
168 return;
169 }
170
171 let act = self
172 .shared
173 .shutdown
174 .compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
175
176 if act == curr {
177 self.handle.wakeup();
178 return;
179 }
180
181 curr = act;
182 }
183 }
184}
185
186fn run(mut reactor: Reactor, shared: Arc<Shared>) {
189 debug!("starting background reactor");
190 loop {
191 let shutdown = shared.shutdown.load(SeqCst);
192
193 if shutdown == SHUTDOWN_NOW {
194 debug!("shutting background reactor down NOW");
195 break;
196 }
197
198 if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
199 debug!("shutting background reactor on idle");
200 break;
201 }
202
203 reactor.turn(None).unwrap();
204 }
205
206 drop(reactor);
207
208 shared.shutdown.store(SHUTDOWN, SeqCst);
210
211 shared.shutdown_task.notify();
213
214 debug!("background reactor has shutdown");
215}