tokio_threadpool/
shutdown.rs

1use task::Task;
2use worker;
3
4use crossbeam_deque::Injector;
5use futures::task::AtomicTask;
6use futures::{Async, Future, Poll};
7
8use std::sync::{Arc, Mutex};
9
10/// Future that resolves when the thread pool is shutdown.
11///
12/// A `ThreadPool` is shutdown once all the worker have drained their queues and
13/// shutdown their threads.
14///
15/// `Shutdown` is returned by [`shutdown`], [`shutdown_on_idle`], and
16/// [`shutdown_now`].
17///
18/// [`shutdown`]: struct.ThreadPool.html#method.shutdown
19/// [`shutdown_on_idle`]: struct.ThreadPool.html#method.shutdown_on_idle
20/// [`shutdown_now`]: struct.ThreadPool.html#method.shutdown_now
21#[derive(Debug)]
22pub struct Shutdown {
23    inner: Arc<Mutex<Inner>>,
24}
25
26/// Shared state between `Shutdown` and `ShutdownTrigger`.
27///
28/// This is used for notifying the `Shutdown` future when `ShutdownTrigger` gets dropped.
29#[derive(Debug)]
30struct Inner {
31    /// The task to notify when the threadpool completes the shutdown process.
32    task: AtomicTask,
33    /// `true` if the threadpool has been shut down.
34    completed: bool,
35}
36
37impl Shutdown {
38    pub(crate) fn new(trigger: &ShutdownTrigger) -> Shutdown {
39        Shutdown {
40            inner: trigger.inner.clone(),
41        }
42    }
43}
44
45impl Future for Shutdown {
46    type Item = ();
47    type Error = ();
48
49    fn poll(&mut self) -> Poll<(), ()> {
50        let inner = self.inner.lock().unwrap();
51
52        if !inner.completed {
53            inner.task.register();
54            Ok(Async::NotReady)
55        } else {
56            Ok(().into())
57        }
58    }
59}
60
61/// When dropped, cleans up threadpool's resources and completes the shutdown process.
62#[derive(Debug)]
63pub(crate) struct ShutdownTrigger {
64    inner: Arc<Mutex<Inner>>,
65    workers: Arc<[worker::Entry]>,
66    queue: Arc<Injector<Arc<Task>>>,
67}
68
69unsafe impl Send for ShutdownTrigger {}
70unsafe impl Sync for ShutdownTrigger {}
71
72impl ShutdownTrigger {
73    pub(crate) fn new(
74        workers: Arc<[worker::Entry]>,
75        queue: Arc<Injector<Arc<Task>>>,
76    ) -> ShutdownTrigger {
77        ShutdownTrigger {
78            inner: Arc::new(Mutex::new(Inner {
79                task: AtomicTask::new(),
80                completed: false,
81            })),
82            workers,
83            queue,
84        }
85    }
86}
87
88impl Drop for ShutdownTrigger {
89    fn drop(&mut self) {
90        // Drain the global task queue.
91        while !self.queue.steal().is_empty() {}
92
93        // Drop the remaining incomplete tasks and parkers assosicated with workers.
94        for worker in self.workers.iter() {
95            worker.shutdown();
96        }
97
98        // Notify the task interested in shutdown.
99        let mut inner = self.inner.lock().unwrap();
100        inner.completed = true;
101        inner.task.notify();
102    }
103}