tokio_reactor/
background.rs

1use {AtomicTask, Handle, Reactor};
2
3use futures::{task, Async, Future, Poll};
4
5use std::io;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering::SeqCst;
8use std::sync::Arc;
9use std::thread;
10
11/// Handle to the reactor running on a background thread.
12///
13/// Instances are created by calling [`Reactor::background`].
14///
15/// [`Reactor::background`]: struct.Reactor.html#method.background
16#[derive(Debug)]
17pub struct Background {
18    /// When `None`, the reactor thread will run until the process terminates.
19    inner: Option<Inner>,
20}
21
22/// Future that resolves when the reactor thread has shutdown.
23#[derive(Debug)]
24pub struct Shutdown {
25    inner: Inner,
26}
27
28/// Actual Background handle.
29#[derive(Debug)]
30struct Inner {
31    /// Handle to the reactor
32    handle: Handle,
33
34    /// Shared state between the background handle and the reactor thread.
35    shared: Arc<Shared>,
36}
37
38#[derive(Debug)]
39struct Shared {
40    /// Signal the reactor thread to shutdown.
41    shutdown: AtomicUsize,
42
43    /// Task to notify when the reactor thread enters a shutdown state.
44    shutdown_task: AtomicTask,
45}
46
47/// Notifies the reactor thread to shutdown once the reactor becomes idle.
48const SHUTDOWN_IDLE: usize = 1;
49
50/// Notifies the reactor thread to shutdown immediately.
51const SHUTDOWN_NOW: usize = 2;
52
53/// The reactor is currently shutdown.
54const SHUTDOWN: usize = 3;
55
56// ===== impl Background =====
57
58impl Background {
59    /// Launch a reactor in the background and return a handle to the thread.
60    pub(crate) fn new(reactor: Reactor) -> io::Result<Background> {
61        // Grab a handle to the reactor
62        let handle = reactor.handle().clone();
63
64        // Create the state shared between the background handle and the reactor
65        // thread.
66        let shared = Arc::new(Shared {
67            shutdown: AtomicUsize::new(0),
68            shutdown_task: AtomicTask::new(),
69        });
70
71        // For the reactor thread
72        let shared2 = shared.clone();
73
74        // Start the reactor thread
75        thread::Builder::new().spawn(move || run(reactor, shared2))?;
76
77        Ok(Background {
78            inner: Some(Inner { handle, shared }),
79        })
80    }
81
82    /// Returns a reference to the reactor handle.
83    pub fn handle(&self) -> &Handle {
84        &self.inner.as_ref().unwrap().handle
85    }
86
87    /// Shutdown the reactor on idle.
88    ///
89    /// Returns a future that completes once the reactor thread has shutdown.
90    pub fn shutdown_on_idle(mut self) -> Shutdown {
91        let inner = self.inner.take().unwrap();
92        inner.shutdown_on_idle();
93
94        Shutdown { inner }
95    }
96
97    /// Shutdown the reactor immediately
98    ///
99    /// Returns a future that completes once the reactor thread has shutdown.
100    pub fn shutdown_now(mut self) -> Shutdown {
101        let inner = self.inner.take().unwrap();
102        inner.shutdown_now();
103
104        Shutdown { inner }
105    }
106
107    /// Run the reactor on its thread until the process terminates.
108    pub fn forget(mut self) {
109        drop(self.inner.take());
110    }
111}
112
113impl Drop for Background {
114    fn drop(&mut self) {
115        let inner = match self.inner.take() {
116            Some(i) => i,
117            None => return,
118        };
119
120        inner.shutdown_now();
121
122        let shutdown = Shutdown { inner };
123        let _ = shutdown.wait();
124    }
125}
126
127// ===== impl Shutdown =====
128
129impl Future for Shutdown {
130    type Item = ();
131    type Error = ();
132
133    fn poll(&mut self) -> Poll<(), ()> {
134        let task = task::current();
135        self.inner.shared.shutdown_task.register_task(task);
136
137        if !self.inner.is_shutdown() {
138            return Ok(Async::NotReady);
139        }
140
141        Ok(().into())
142    }
143}
144
145// ===== impl Inner =====
146
147impl Inner {
148    /// Returns true if the reactor thread is shutdown.
149    fn is_shutdown(&self) -> bool {
150        self.shared.shutdown.load(SeqCst) == SHUTDOWN
151    }
152
153    /// Notify the reactor thread to shutdown once the reactor transitions to an
154    /// idle state.
155    fn shutdown_on_idle(&self) {
156        self.shared
157            .shutdown
158            .compare_and_swap(0, SHUTDOWN_IDLE, SeqCst);
159        self.handle.wakeup();
160    }
161
162    /// Notify the reactor thread to shutdown immediately.
163    fn shutdown_now(&self) {
164        let mut curr = self.shared.shutdown.load(SeqCst);
165
166        loop {
167            if curr >= SHUTDOWN_NOW {
168                return;
169            }
170
171            let act = self
172                .shared
173                .shutdown
174                .compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
175
176            if act == curr {
177                self.handle.wakeup();
178                return;
179            }
180
181            curr = act;
182        }
183    }
184}
185
186// ===== impl Reactor thread =====
187
188fn run(mut reactor: Reactor, shared: Arc<Shared>) {
189    debug!("starting background reactor");
190    loop {
191        let shutdown = shared.shutdown.load(SeqCst);
192
193        if shutdown == SHUTDOWN_NOW {
194            debug!("shutting background reactor down NOW");
195            break;
196        }
197
198        if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
199            debug!("shutting background reactor on idle");
200            break;
201        }
202
203        reactor.turn(None).unwrap();
204    }
205
206    drop(reactor);
207
208    // Transition the state to shutdown
209    shared.shutdown.store(SHUTDOWN, SeqCst);
210
211    // Notify any waiters
212    shared.shutdown_task.notify();
213
214    debug!("background reactor has shutdown");
215}