1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use std::cell::Cell;
use std::ptr;
use crossbeam_deque::Worker;
use super::pool;
use super::task;
use super::Task;
use crate::utils::abort_on_panic;
pub fn current() -> Task {
get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
}
thread_local! {
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
}
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
where
F: FnOnce() -> R,
{
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
impl Drop for ResetTag<'_> {
fn drop(&mut self) {
self.0.set(ptr::null());
}
}
TAG.with(|t| {
t.set(tag);
let _guard = ResetTag(t);
f()
})
}
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
where
F: FnOnce(&Task) -> R,
{
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
thread_local! {
static IS_WORKER: Cell<bool> = Cell::new(false);
static QUEUE: Cell<Option<Worker<task::Runnable>>> = Cell::new(None);
}
pub(crate) fn is_worker() -> bool {
IS_WORKER.with(|is_worker| is_worker.get())
}
fn get_queue<F: FnOnce(&Worker<task::Runnable>) -> T, T>(f: F) -> T {
QUEUE.with(|queue| {
let q = queue.take().unwrap();
let ret = f(&q);
queue.set(Some(q));
ret
})
}
pub(crate) fn schedule(task: task::Runnable) {
if is_worker() {
get_queue(|q| q.push(task));
} else {
pool::get().injector.push(task);
}
pool::get().sleepers.notify_one();
}
pub(crate) fn main_loop(worker: Worker<task::Runnable>) {
IS_WORKER.with(|is_worker| is_worker.set(true));
QUEUE.with(|queue| queue.set(Some(worker)));
loop {
match get_queue(|q| pool::get().find_task(q)) {
Some(task) => set_tag(task.tag(), || abort_on_panic(|| task.run())),
None => pool::get().sleepers.wait(),
}
}
}