tokio_threadpool/
shutdown.rs1use task::Task;
2use worker;
3
4use crossbeam_deque::Injector;
5use futures::task::AtomicTask;
6use futures::{Async, Future, Poll};
7
8use std::sync::{Arc, Mutex};
9
10#[derive(Debug)]
22pub struct Shutdown {
23 inner: Arc<Mutex<Inner>>,
24}
25
26#[derive(Debug)]
30struct Inner {
31 task: AtomicTask,
33 completed: bool,
35}
36
37impl Shutdown {
38 pub(crate) fn new(trigger: &ShutdownTrigger) -> Shutdown {
39 Shutdown {
40 inner: trigger.inner.clone(),
41 }
42 }
43}
44
45impl Future for Shutdown {
46 type Item = ();
47 type Error = ();
48
49 fn poll(&mut self) -> Poll<(), ()> {
50 let inner = self.inner.lock().unwrap();
51
52 if !inner.completed {
53 inner.task.register();
54 Ok(Async::NotReady)
55 } else {
56 Ok(().into())
57 }
58 }
59}
60
61#[derive(Debug)]
63pub(crate) struct ShutdownTrigger {
64 inner: Arc<Mutex<Inner>>,
65 workers: Arc<[worker::Entry]>,
66 queue: Arc<Injector<Arc<Task>>>,
67}
68
69unsafe impl Send for ShutdownTrigger {}
70unsafe impl Sync for ShutdownTrigger {}
71
72impl ShutdownTrigger {
73 pub(crate) fn new(
74 workers: Arc<[worker::Entry]>,
75 queue: Arc<Injector<Arc<Task>>>,
76 ) -> ShutdownTrigger {
77 ShutdownTrigger {
78 inner: Arc::new(Mutex::new(Inner {
79 task: AtomicTask::new(),
80 completed: false,
81 })),
82 workers,
83 queue,
84 }
85 }
86}
87
88impl Drop for ShutdownTrigger {
89 fn drop(&mut self) {
90 while !self.queue.steal().is_empty() {}
92
93 for worker in self.workers.iter() {
95 worker.shutdown();
96 }
97
98 let mut inner = self.inner.lock().unwrap();
100 inner.completed = true;
101 inner.task.notify();
102 }
103}