adaptive_barrier/
lib.rs

1//! A synchronization barrier that adapts to the number of subscribing threads.
2//!
3//! This has the same goal as the [`std::sync::Barrier`], but it handles runtime additions
4//! or removals of thread subscriptions ‒ the number of threads waiting for the barrier can change
5//! (even while some threads are already waiting).
6//!
7//! It can be convenient if your algorithm changes the number of working threads during lifetime.
8//! You don't need a different barrier for different phases of the algorithm.
9//!
10//! But most importantly, the [`Barrier`] is robust in face of panics.
11//!
12//! # Problems with panics and the [`std::sync::Barrier`]
13//!
14//! If we have a barrier that was set up for `n` threads, some of the threads park on it and wait
15//! for the rest to finish, but one of the other threads has a bug and panics, the already parked
16//! threads will never get a chance to continue and the whole algorithm deadlocks. This is usually
17//! worse than propagating the panic and cleaning up the whole algorithm or even shutting down the
18//! whole application, because then something can recover by restarting it. If the application
19//! deadlocks in the computation phase, but otherwise looks healthy, it will never recover.
20//!
21//! This makes applications less robust and makes tests which use barriers very annoying and
22//! fragile to write.
23//!
24//! Our [`Barrier`] watches the number of subscribed threads (by counting the number of its own
25//! clones, unlike the standard barrier, this one can and need to be cloned for each thread). If a
26//! thread disappears (or is added), the expectations are adjusted.
27//!
28//! It also has a mode in which it'll get poisoned and propagate the panic to the rest of the
29//! group.
30#![doc(test(attr(deny(warnings))))]
31#![forbid(unsafe_code)]
32#![warn(missing_docs)]
33
34use std::fmt::{Debug, Formatter, Result as FmtResult};
35use std::mem;
36use std::panic::UnwindSafe;
37use std::sync::{Arc, Condvar, Mutex, MutexGuard};
38use std::thread;
39
40/// What to do if a [`Barrier`] is destroyed during a panic.
41#[derive(Copy, Clone, Debug, Eq, PartialEq)]
42pub enum PanicMode {
43    /// Nothing special.
44    ///
45    /// Just decrement the number of expected threads, just like during any normal destruction.
46    Decrement,
47
48    /// Poison the barrier.
49    ///
50    /// All calls to [`wait`][Barrier::wait], including the ones that are already in progress, will
51    /// panic too. Once poisoned, there's no way to "unpoison" the barrier.
52    ///
53    /// This is useful in case a failure in one thread makes the whole group unusable (very often
54    /// in tests).
55    Poison,
56}
57
58/// A result after waiting.
59///
60/// This can be used to designate a single thread as the leader after calling
61/// [`wait`][Barrier::wait].
62#[derive(Debug)]
63pub struct WaitResult {
64    is_leader: bool,
65}
66
67impl WaitResult {
68    /// Returns true for exactly one thread from a waiting group.
69    ///
70    /// An algorithm can use that to pick a thread between equals that'll do some singleton thing
71    /// (consolidate the results, for example).
72    pub fn is_leader(&self) -> bool {
73        self.is_leader
74    }
75}
76
77struct Inner {
78    active: usize,
79    waiting: usize,
80    gen: usize,
81    leader: bool,
82    poisoned: bool,
83}
84
85impl Inner {
86    fn check_release(&mut self) -> bool {
87        if self.waiting >= self.active || self.poisoned {
88            self.leader = true;
89            self.gen = self.gen.wrapping_add(1);
90            self.waiting = 0;
91            true
92        } else {
93            false
94        }
95    }
96}
97
98struct Shared {
99    inner: Mutex<Inner>,
100    condvar: Condvar,
101    panic_mode: PanicMode,
102}
103
104/// A Barrier to synchronize multiple threads.
105///
106/// Multiple threads can meet on a single barrier to synchronize a "meeting point" in a computation
107/// (eg. when they need to pass results to others), much like the [`Barrier`][std::sync::Barrier]
108/// from the standard library.
109///
110/// Unlike that, the expected number of threads waiting for the barrier is not preset in the `new`
111/// call, but autodetected and adapted to at runtime.
112///
113/// The way this is done is by cloning the original [`Barrier`] ‒ for a group to continue after
114/// wait, a [`wait`][Barrier::wait] needs to be called on each clone. This allows to add or remove
115/// (even implicitly by panicking) the clones as needed.
116///
117/// # Examples
118///
119/// ```rust
120/// # use std::thread;
121/// # use adaptive_barrier::{Barrier, PanicMode};
122///
123/// let barrier = Barrier::new(PanicMode::Poison);
124/// let mut threads = Vec::new();
125/// for _ in 0..4 {
126///     // Each thread gets its own clone of the barrier. They are tied together, not independent.
127///     let mut barrier = barrier.clone();
128///     let thread = thread::spawn(move || {
129///         // Wait to start everything at the same time
130///         barrier.wait();
131///
132///         // ... Do some work that needs to start synchronously ...
133///         // Now, if this part panics, it will *not* deadlock, it'll unlock the others just fine
134///         // and propagate the panic (see the parameter to new(..)
135///
136///         // Wait for all threads to finish
137///         if barrier.wait().is_leader() {
138///             // Pick one thread to consolidate the results here
139///
140///             // Note that as we don't call wait any more, if we panic here, it'll not get
141///             // propagated through the barrier any more.
142///         }
143///     });
144///     threads.push(thread);
145/// }
146///
147/// // Watch out for the last instance here in the main/controlling thread. You can either call
148/// // wait on it too, or make sure it is dropped. If you don't, others will keep waiting for it.
149/// drop(barrier);
150///
151/// for thread in threads {
152///     thread.join().expect("Propagating thread panic");
153/// }
154/// ```
155pub struct Barrier(Arc<Shared>);
156
157impl Barrier {
158    /// Creates a new (independent) barrier.
159    ///
160    /// To create more handles to the same barrier, clone it.
161    ///
162    /// The panic mode specifies what to do if a barrier observes a panic (is dropped while
163    /// panicking).
164    pub fn new(panic_mode: PanicMode) -> Self {
165        Barrier(Arc::new(Shared {
166            inner: Mutex::new(Inner {
167                active: 1, // this thread
168                waiting: 0,
169                gen: 0,
170                leader: false,
171                poisoned: false,
172            }),
173            condvar: Condvar::new(),
174            panic_mode,
175        }))
176    }
177
178    fn check_release(&self, lock: &mut MutexGuard<'_, Inner>) {
179        if lock.check_release() {
180            self.0.condvar.notify_all();
181        }
182    }
183
184    /// Wait for all the other threads to wait too.
185    ///
186    /// This'll block until all threads holding clones of the same barrier call `wait`.
187    ///
188    /// # Panics
189    ///
190    /// If the barrier was created with [`PanicMode::Poison`] and some other clone of the barrier
191    /// observed a panic, this'll also panic (even if it was already parked inside).
192    pub fn wait(&mut self) -> WaitResult {
193        let mut lock = self.0.inner.lock().unwrap();
194        lock.waiting += 1;
195        let gen = lock.gen;
196        self.check_release(&mut lock);
197        while gen == lock.gen {
198            lock = self.0.condvar.wait(lock).unwrap();
199        }
200        if lock.poisoned {
201            drop(lock); // Make sure we don't poison the mutex too
202            panic!("Barrier is poisoned");
203        }
204        WaitResult {
205            is_leader: mem::replace(&mut lock.leader, false),
206        }
207    }
208}
209
210impl Clone for Barrier {
211    fn clone(&self) -> Self {
212        let new = Arc::clone(&self.0);
213        new.inner.lock().unwrap().active += 1;
214        Barrier(new)
215    }
216}
217
218impl Drop for Barrier {
219    fn drop(&mut self) {
220        let mut lock = self.0.inner.lock().unwrap();
221        lock.active -= 1;
222
223        if self.0.panic_mode == PanicMode::Poison && thread::panicking() {
224            lock.poisoned = true;
225        }
226
227        self.check_release(&mut lock);
228    }
229}
230
231impl Debug for Barrier {
232    fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
233        fmt.pad("Barrier { .. }")
234    }
235}
236
237impl Default for Barrier {
238    fn default() -> Self {
239        Self::new(PanicMode::Decrement)
240    }
241}
242
243// We deal with panics explicitly.
244impl UnwindSafe for Barrier {}
245
246#[cfg(test)]
247mod tests {
248    use std::panic;
249    use std::sync::atomic::Ordering::*;
250    use std::sync::atomic::{AtomicBool, AtomicUsize};
251    use std::thread::{self, sleep};
252    use std::time::Duration;
253
254    use super::*;
255
256    /// When we have just one instance, it doesn't wait.
257    #[test]
258    fn single() {
259        let mut bar = Barrier::new(PanicMode::Decrement);
260        assert!(bar.wait().is_leader());
261    }
262
263    /// Check the barriers wait for each other.
264    #[test]
265    fn dispatch() {
266        let mut bar = Barrier::new(PanicMode::Decrement);
267        let waited = Arc::new(AtomicBool::new(false));
268        let t = thread::spawn({
269            let mut bar = bar.clone();
270            let waited = Arc::clone(&waited);
271            move || {
272                bar.wait();
273                waited.store(true, SeqCst);
274                bar.wait();
275            }
276        });
277
278        sleep(Duration::from_millis(50));
279        assert!(!waited.load(SeqCst));
280        bar.wait();
281        bar.wait();
282        assert!(waited.load(SeqCst));
283
284        t.join().unwrap();
285    }
286
287    #[test]
288    fn adjust_up() {
289        let mut bar = Barrier::new(PanicMode::Decrement);
290        let woken = Arc::new(AtomicUsize::new(0));
291        let t1 = thread::spawn({
292            let mut bar = bar.clone();
293            let woken = Arc::clone(&woken);
294            move || {
295                bar.wait();
296                woken.fetch_add(1, SeqCst);
297                bar.wait();
298            }
299        });
300
301        sleep(Duration::from_millis(50));
302        assert_eq!(woken.load(SeqCst), 0);
303
304        let t2 = thread::spawn({
305            let mut bar = bar.clone();
306            let woken = Arc::clone(&woken);
307            move || {
308                bar.wait();
309                woken.fetch_add(1, SeqCst);
310                bar.wait();
311            }
312        });
313
314        sleep(Duration::from_millis(50));
315        assert_eq!(woken.load(SeqCst), 0);
316
317        bar.wait();
318        bar.wait();
319        assert_eq!(woken.load(SeqCst), 2);
320
321        t1.join().unwrap();
322        t2.join().unwrap();
323    }
324
325    #[test]
326    fn adjust_down() {
327        let mut bar = Barrier::new(PanicMode::Decrement);
328        let woken = Arc::new(AtomicUsize::new(0));
329        let t1 = thread::spawn({
330            let mut bar = bar.clone();
331            let woken = Arc::clone(&woken);
332            move || {
333                bar.wait();
334                woken.fetch_add(1, SeqCst);
335                bar.wait();
336            }
337        });
338
339        let t2 = thread::spawn({
340            let mut bar = bar.clone();
341            let woken = Arc::clone(&woken);
342            move || {
343                // Only one wait, the second one will be done on only 2 copies
344                bar.wait();
345                woken.fetch_add(1, SeqCst);
346            }
347        });
348
349        sleep(Duration::from_millis(50));
350        assert_eq!(woken.load(SeqCst), 0);
351
352        bar.wait();
353        t2.join().unwrap();
354        bar.wait();
355        assert_eq!(woken.load(SeqCst), 2);
356
357        t1.join().unwrap();
358    }
359
360    #[test]
361    fn adjust_panic() {
362        let mut bar = Barrier::new(PanicMode::Decrement);
363        let woken = Arc::new(AtomicUsize::new(0));
364        let t1 = thread::spawn({
365            let mut bar = bar.clone();
366            let woken = Arc::clone(&woken);
367            move || {
368                bar.wait();
369                woken.fetch_add(1, SeqCst);
370                bar.wait();
371                woken.fetch_add(1, SeqCst);
372            }
373        });
374
375        let t2 = thread::spawn({
376            let mut bar = bar.clone();
377            let woken = Arc::clone(&woken);
378            move || {
379                // Only one wait, the second one will be done on only 2 copies
380                bar.wait();
381                woken.fetch_add(1, SeqCst);
382                panic!("We are going to panic, woohooo, the thing still adjusts");
383            }
384        });
385
386        sleep(Duration::from_millis(50));
387        assert_eq!(woken.load(SeqCst), 0);
388
389        bar.wait();
390        t2.join().unwrap_err();
391        bar.wait();
392
393        t1.join().unwrap();
394
395        assert_eq!(woken.load(SeqCst), 3);
396    }
397
398    #[test]
399    fn adjust_drop() {
400        let bar = Barrier::new(PanicMode::Decrement);
401        let woken = Arc::new(AtomicUsize::new(0));
402        let t1 = thread::spawn({
403            let mut bar = bar.clone();
404            let woken = Arc::clone(&woken);
405            move || {
406                bar.wait();
407                woken.fetch_add(1, SeqCst);
408                bar.wait();
409            }
410        });
411
412        sleep(Duration::from_millis(50));
413        assert_eq!(woken.load(SeqCst), 0);
414
415        let t2 = thread::spawn({
416            let mut bar = bar.clone();
417            let woken = Arc::clone(&woken);
418            move || {
419                bar.wait();
420                woken.fetch_add(1, SeqCst);
421                bar.wait();
422            }
423        });
424
425        sleep(Duration::from_millis(50));
426        assert_eq!(woken.load(SeqCst), 0);
427        drop(bar);
428
429        t1.join().unwrap();
430        t2.join().unwrap();
431        assert_eq!(woken.load(SeqCst), 2);
432    }
433
434    /// Poisoning of the barrier.
435    #[test]
436    #[cfg_attr(clippy, allow(clippy::redundant_clone))]
437    fn poisoning() {
438        let mut bar = Barrier::new(PanicMode::Poison);
439        let woken = Arc::new(AtomicUsize::new(0));
440
441        let t1 = thread::spawn({
442            let mut bar = bar.clone();
443            let woken = Arc::new(AtomicUsize::new(0));
444            move || {
445                bar.wait();
446                woken.fetch_add(1, SeqCst);
447                bar.wait();
448            }
449        });
450
451        sleep(Duration::from_millis(50));
452        assert_eq!(woken.load(SeqCst), 0);
453
454        let t2 = thread::spawn({
455            let bar = bar.clone();
456            move || {
457                // Make sure this one gets into the closure so we destroy it on the panic.
458                // Not issue in practice where it would get pulled in by .wait(), but we don't have
459                // one here in test.
460                let _bar = bar;
461                panic!("Testing a panic");
462            }
463        });
464
465        // The thread 2 panics
466        t2.join().unwrap_err();
467        // And the panic propagates to t1, even though we still hold our copy of barrier.
468        t1.join().unwrap_err();
469
470        // Our last instance would panic too.
471        panic::catch_unwind(move || bar.wait()).unwrap_err();
472    }
473}