1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use crate::job::{ArcJob, StackJob};
use crate::latch::LatchRef;
use crate::registry::{Registry, WorkerThread};
use crate::scope::ScopeLatch;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;

mod test;

/// Executes `op` within every thread in the current threadpool. If this is
/// called from a non-Rayon thread, it will execute in the global threadpool.
/// Any attempts to use `join`, `scope`, or parallel iterators will then operate
/// within that threadpool. When the call has completed on each thread, returns
/// a vector containing all of their return values.
///
/// For more information, see the [`ThreadPool::broadcast()`][m] method.
///
/// [m]: struct.ThreadPool.html#method.broadcast
pub fn broadcast<OP, R>(op: OP) -> Vec<R>
where
    OP: Fn(BroadcastContext<'_>) -> R + Sync,
    R: Send,
{
    // We assert that current registry has not terminated.
    unsafe { broadcast_in(op, &Registry::current()) }
}

/// Spawns an asynchronous task on every thread in this thread-pool. This task
/// will run in the implicit, global scope, which means that it may outlast the
/// current stack frame -- therefore, it cannot capture any references onto the
/// stack (you will likely need a `move` closure).
///
/// For more information, see the [`ThreadPool::spawn_broadcast()`][m] method.
///
/// [m]: struct.ThreadPool.html#method.spawn_broadcast
pub fn spawn_broadcast<OP>(op: OP)
where
    OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
{
    // We assert that current registry has not terminated.
    unsafe { spawn_broadcast_in(op, &Registry::current()) }
}

/// Provides context to a closure called by `broadcast`.
pub struct BroadcastContext<'a> {
    worker: &'a WorkerThread,

    /// Make sure to prevent auto-traits like `Send` and `Sync`.
    _marker: PhantomData<&'a mut dyn Fn()>,
}

impl<'a> BroadcastContext<'a> {
    pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R {
        let worker_thread = WorkerThread::current();
        assert!(!worker_thread.is_null());
        f(BroadcastContext {
            worker: unsafe { &*worker_thread },
            _marker: PhantomData,
        })
    }

    /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`).
    #[inline]
    pub fn index(&self) -> usize {
        self.worker.index()
    }

    /// The number of threads receiving the broadcast in the thread pool.
    ///
    /// # Future compatibility note
    ///
    /// Future versions of Rayon might vary the number of threads over time, but
    /// this method will always return the number of threads which are actually
    /// receiving your particular `broadcast` call.
    #[inline]
    pub fn num_threads(&self) -> usize {
        self.worker.registry().num_threads()
    }
}

impl<'a> fmt::Debug for BroadcastContext<'a> {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("BroadcastContext")
            .field("index", &self.index())
            .field("num_threads", &self.num_threads())
            .field("pool_id", &self.worker.registry().id())
            .finish()
    }
}

/// Execute `op` on every thread in the pool. It will be executed on each
/// thread when they have nothing else to do locally, before they try to
/// steal work from other threads. This function will not return until all
/// threads have completed the `op`.
///
/// Unsafe because `registry` must not yet have terminated.
pub(super) unsafe fn broadcast_in<OP, R>(op: OP, registry: &Arc<Registry>) -> Vec<R>
where
    OP: Fn(BroadcastContext<'_>) -> R + Sync,
    R: Send,
{
    let f = move |injected: bool| {
        debug_assert!(injected);
        BroadcastContext::with(&op)
    };

    let n_threads = registry.num_threads();
    let current_thread = WorkerThread::current().as_ref();
    let tlv = crate::tlv::get();
    let latch = ScopeLatch::with_count(n_threads, current_thread);
    let jobs: Vec<_> = (0..n_threads)
        .map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch)))
        .collect();
    let job_refs = jobs.iter().map(|job| job.as_job_ref());

    registry.inject_broadcast(job_refs);

    // Wait for all jobs to complete, then collect the results, maybe propagating a panic.
    latch.wait(current_thread);
    jobs.into_iter().map(|job| job.into_result()).collect()
}

/// Execute `op` on every thread in the pool. It will be executed on each
/// thread when they have nothing else to do locally, before they try to
/// steal work from other threads. This function returns immediately after
/// injecting the jobs.
///
/// Unsafe because `registry` must not yet have terminated.
pub(super) unsafe fn spawn_broadcast_in<OP>(op: OP, registry: &Arc<Registry>)
where
    OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
{
    let job = ArcJob::new({
        let registry = Arc::clone(registry);
        move || {
            registry.catch_unwind(|| BroadcastContext::with(&op));
            registry.terminate(); // (*) permit registry to terminate now
        }
    });

    let n_threads = registry.num_threads();
    let job_refs = (0..n_threads).map(|_| {
        // Ensure that registry cannot terminate until this job has executed
        // on each thread. This ref is decremented at the (*) above.
        registry.increment_terminate_count();

        ArcJob::as_static_job_ref(&job)
    });

    registry.inject_broadcast(job_refs);
}