1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
use std::cell::Cell;
use std::ptr;

use crossbeam_deque::Worker;

use super::pool;
use super::task;
use super::Task;
use crate::utils::abort_on_panic;

/// Returns a handle to the current task.
///
/// # Panics
///
/// This function will panic if not called within the context of a task created by [`block_on`],
/// [`spawn`], or [`Builder::spawn`].
///
/// [`block_on`]: fn.block_on.html
/// [`spawn`]: fn.spawn.html
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// println!("The name of this task is {:?}", task::current().name());
/// #
/// # }) }
/// ```
pub fn current() -> Task {
    get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
}

thread_local! {
    static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
}

pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
where
    F: FnOnce() -> R,
{
    struct ResetTag<'a>(&'a Cell<*const task::Tag>);

    impl Drop for ResetTag<'_> {
        fn drop(&mut self) {
            self.0.set(ptr::null());
        }
    }

    TAG.with(|t| {
        t.set(tag);
        let _guard = ResetTag(t);

        f()
    })
}

pub(crate) fn get_task<F, R>(f: F) -> Option<R>
where
    F: FnOnce(&Task) -> R,
{
    let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });

    match res {
        Ok(Some(val)) => Some(val),
        Ok(None) | Err(_) => None,
    }
}

thread_local! {
    static IS_WORKER: Cell<bool> = Cell::new(false);
    static QUEUE: Cell<Option<Worker<task::Runnable>>> = Cell::new(None);
}

pub(crate) fn is_worker() -> bool {
    IS_WORKER.with(|is_worker| is_worker.get())
}

fn get_queue<F: FnOnce(&Worker<task::Runnable>) -> T, T>(f: F) -> T {
    QUEUE.with(|queue| {
        let q = queue.take().unwrap();
        let ret = f(&q);
        queue.set(Some(q));
        ret
    })
}

pub(crate) fn schedule(task: task::Runnable) {
    if is_worker() {
        get_queue(|q| q.push(task));
    } else {
        pool::get().injector.push(task);
    }
    pool::get().sleepers.notify_one();
}

pub(crate) fn main_loop(worker: Worker<task::Runnable>) {
    IS_WORKER.with(|is_worker| is_worker.set(true));
    QUEUE.with(|queue| queue.set(Some(worker)));

    loop {
        match get_queue(|q| pool::get().find_task(q)) {
            Some(task) => set_tag(task.tag(), || abort_on_panic(|| task.run())),
            None => pool::get().sleepers.wait(),
        }
    }
}