adaptive_barrier/lib.rs
1//! A synchronization barrier that adapts to the number of subscribing threads.
2//!
3//! This has the same goal as the [`std::sync::Barrier`], but it handles runtime additions
4//! or removals of thread subscriptions ‒ the number of threads waiting for the barrier can change
5//! (even while some threads are already waiting).
6//!
7//! It can be convenient if your algorithm changes the number of working threads during lifetime.
8//! You don't need a different barrier for different phases of the algorithm.
9//!
10//! But most importantly, the [`Barrier`] is robust in face of panics.
11//!
12//! # Problems with panics and the [`std::sync::Barrier`]
13//!
14//! If we have a barrier that was set up for `n` threads, some of the threads park on it and wait
15//! for the rest to finish, but one of the other threads has a bug and panics, the already parked
16//! threads will never get a chance to continue and the whole algorithm deadlocks. This is usually
17//! worse than propagating the panic and cleaning up the whole algorithm or even shutting down the
18//! whole application, because then something can recover by restarting it. If the application
19//! deadlocks in the computation phase, but otherwise looks healthy, it will never recover.
20//!
21//! This makes applications less robust and makes tests which use barriers very annoying and
22//! fragile to write.
23//!
24//! Our [`Barrier`] watches the number of subscribed threads (by counting the number of its own
25//! clones, unlike the standard barrier, this one can and need to be cloned for each thread). If a
26//! thread disappears (or is added), the expectations are adjusted.
27//!
28//! It also has a mode in which it'll get poisoned and propagate the panic to the rest of the
29//! group.
30#![doc(test(attr(deny(warnings))))]
31#![forbid(unsafe_code)]
32#![warn(missing_docs)]
33
34use std::fmt::{Debug, Formatter, Result as FmtResult};
35use std::mem;
36use std::panic::UnwindSafe;
37use std::sync::{Arc, Condvar, Mutex, MutexGuard};
38use std::thread;
39
40/// What to do if a [`Barrier`] is destroyed during a panic.
41#[derive(Copy, Clone, Debug, Eq, PartialEq)]
42pub enum PanicMode {
43 /// Nothing special.
44 ///
45 /// Just decrement the number of expected threads, just like during any normal destruction.
46 Decrement,
47
48 /// Poison the barrier.
49 ///
50 /// All calls to [`wait`][Barrier::wait], including the ones that are already in progress, will
51 /// panic too. Once poisoned, there's no way to "unpoison" the barrier.
52 ///
53 /// This is useful in case a failure in one thread makes the whole group unusable (very often
54 /// in tests).
55 Poison,
56}
57
58/// A result after waiting.
59///
60/// This can be used to designate a single thread as the leader after calling
61/// [`wait`][Barrier::wait].
62#[derive(Debug)]
63pub struct WaitResult {
64 is_leader: bool,
65}
66
67impl WaitResult {
68 /// Returns true for exactly one thread from a waiting group.
69 ///
70 /// An algorithm can use that to pick a thread between equals that'll do some singleton thing
71 /// (consolidate the results, for example).
72 pub fn is_leader(&self) -> bool {
73 self.is_leader
74 }
75}
76
77struct Inner {
78 active: usize,
79 waiting: usize,
80 gen: usize,
81 leader: bool,
82 poisoned: bool,
83}
84
85impl Inner {
86 fn check_release(&mut self) -> bool {
87 if self.waiting >= self.active || self.poisoned {
88 self.leader = true;
89 self.gen = self.gen.wrapping_add(1);
90 self.waiting = 0;
91 true
92 } else {
93 false
94 }
95 }
96}
97
98struct Shared {
99 inner: Mutex<Inner>,
100 condvar: Condvar,
101 panic_mode: PanicMode,
102}
103
104/// A Barrier to synchronize multiple threads.
105///
106/// Multiple threads can meet on a single barrier to synchronize a "meeting point" in a computation
107/// (eg. when they need to pass results to others), much like the [`Barrier`][std::sync::Barrier]
108/// from the standard library.
109///
110/// Unlike that, the expected number of threads waiting for the barrier is not preset in the `new`
111/// call, but autodetected and adapted to at runtime.
112///
113/// The way this is done is by cloning the original [`Barrier`] ‒ for a group to continue after
114/// wait, a [`wait`][Barrier::wait] needs to be called on each clone. This allows to add or remove
115/// (even implicitly by panicking) the clones as needed.
116///
117/// # Examples
118///
119/// ```rust
120/// # use std::thread;
121/// # use adaptive_barrier::{Barrier, PanicMode};
122///
123/// let barrier = Barrier::new(PanicMode::Poison);
124/// let mut threads = Vec::new();
125/// for _ in 0..4 {
126/// // Each thread gets its own clone of the barrier. They are tied together, not independent.
127/// let mut barrier = barrier.clone();
128/// let thread = thread::spawn(move || {
129/// // Wait to start everything at the same time
130/// barrier.wait();
131///
132/// // ... Do some work that needs to start synchronously ...
133/// // Now, if this part panics, it will *not* deadlock, it'll unlock the others just fine
134/// // and propagate the panic (see the parameter to new(..)
135///
136/// // Wait for all threads to finish
137/// if barrier.wait().is_leader() {
138/// // Pick one thread to consolidate the results here
139///
140/// // Note that as we don't call wait any more, if we panic here, it'll not get
141/// // propagated through the barrier any more.
142/// }
143/// });
144/// threads.push(thread);
145/// }
146///
147/// // Watch out for the last instance here in the main/controlling thread. You can either call
148/// // wait on it too, or make sure it is dropped. If you don't, others will keep waiting for it.
149/// drop(barrier);
150///
151/// for thread in threads {
152/// thread.join().expect("Propagating thread panic");
153/// }
154/// ```
155pub struct Barrier(Arc<Shared>);
156
157impl Barrier {
158 /// Creates a new (independent) barrier.
159 ///
160 /// To create more handles to the same barrier, clone it.
161 ///
162 /// The panic mode specifies what to do if a barrier observes a panic (is dropped while
163 /// panicking).
164 pub fn new(panic_mode: PanicMode) -> Self {
165 Barrier(Arc::new(Shared {
166 inner: Mutex::new(Inner {
167 active: 1, // this thread
168 waiting: 0,
169 gen: 0,
170 leader: false,
171 poisoned: false,
172 }),
173 condvar: Condvar::new(),
174 panic_mode,
175 }))
176 }
177
178 fn check_release(&self, lock: &mut MutexGuard<'_, Inner>) {
179 if lock.check_release() {
180 self.0.condvar.notify_all();
181 }
182 }
183
184 /// Wait for all the other threads to wait too.
185 ///
186 /// This'll block until all threads holding clones of the same barrier call `wait`.
187 ///
188 /// # Panics
189 ///
190 /// If the barrier was created with [`PanicMode::Poison`] and some other clone of the barrier
191 /// observed a panic, this'll also panic (even if it was already parked inside).
192 pub fn wait(&mut self) -> WaitResult {
193 let mut lock = self.0.inner.lock().unwrap();
194 lock.waiting += 1;
195 let gen = lock.gen;
196 self.check_release(&mut lock);
197 while gen == lock.gen {
198 lock = self.0.condvar.wait(lock).unwrap();
199 }
200 if lock.poisoned {
201 drop(lock); // Make sure we don't poison the mutex too
202 panic!("Barrier is poisoned");
203 }
204 WaitResult {
205 is_leader: mem::replace(&mut lock.leader, false),
206 }
207 }
208}
209
210impl Clone for Barrier {
211 fn clone(&self) -> Self {
212 let new = Arc::clone(&self.0);
213 new.inner.lock().unwrap().active += 1;
214 Barrier(new)
215 }
216}
217
218impl Drop for Barrier {
219 fn drop(&mut self) {
220 let mut lock = self.0.inner.lock().unwrap();
221 lock.active -= 1;
222
223 if self.0.panic_mode == PanicMode::Poison && thread::panicking() {
224 lock.poisoned = true;
225 }
226
227 self.check_release(&mut lock);
228 }
229}
230
231impl Debug for Barrier {
232 fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
233 fmt.pad("Barrier { .. }")
234 }
235}
236
237impl Default for Barrier {
238 fn default() -> Self {
239 Self::new(PanicMode::Decrement)
240 }
241}
242
243// We deal with panics explicitly.
244impl UnwindSafe for Barrier {}
245
246#[cfg(test)]
247mod tests {
248 use std::panic;
249 use std::sync::atomic::Ordering::*;
250 use std::sync::atomic::{AtomicBool, AtomicUsize};
251 use std::thread::{self, sleep};
252 use std::time::Duration;
253
254 use super::*;
255
256 /// When we have just one instance, it doesn't wait.
257 #[test]
258 fn single() {
259 let mut bar = Barrier::new(PanicMode::Decrement);
260 assert!(bar.wait().is_leader());
261 }
262
263 /// Check the barriers wait for each other.
264 #[test]
265 fn dispatch() {
266 let mut bar = Barrier::new(PanicMode::Decrement);
267 let waited = Arc::new(AtomicBool::new(false));
268 let t = thread::spawn({
269 let mut bar = bar.clone();
270 let waited = Arc::clone(&waited);
271 move || {
272 bar.wait();
273 waited.store(true, SeqCst);
274 bar.wait();
275 }
276 });
277
278 sleep(Duration::from_millis(50));
279 assert!(!waited.load(SeqCst));
280 bar.wait();
281 bar.wait();
282 assert!(waited.load(SeqCst));
283
284 t.join().unwrap();
285 }
286
287 #[test]
288 fn adjust_up() {
289 let mut bar = Barrier::new(PanicMode::Decrement);
290 let woken = Arc::new(AtomicUsize::new(0));
291 let t1 = thread::spawn({
292 let mut bar = bar.clone();
293 let woken = Arc::clone(&woken);
294 move || {
295 bar.wait();
296 woken.fetch_add(1, SeqCst);
297 bar.wait();
298 }
299 });
300
301 sleep(Duration::from_millis(50));
302 assert_eq!(woken.load(SeqCst), 0);
303
304 let t2 = thread::spawn({
305 let mut bar = bar.clone();
306 let woken = Arc::clone(&woken);
307 move || {
308 bar.wait();
309 woken.fetch_add(1, SeqCst);
310 bar.wait();
311 }
312 });
313
314 sleep(Duration::from_millis(50));
315 assert_eq!(woken.load(SeqCst), 0);
316
317 bar.wait();
318 bar.wait();
319 assert_eq!(woken.load(SeqCst), 2);
320
321 t1.join().unwrap();
322 t2.join().unwrap();
323 }
324
325 #[test]
326 fn adjust_down() {
327 let mut bar = Barrier::new(PanicMode::Decrement);
328 let woken = Arc::new(AtomicUsize::new(0));
329 let t1 = thread::spawn({
330 let mut bar = bar.clone();
331 let woken = Arc::clone(&woken);
332 move || {
333 bar.wait();
334 woken.fetch_add(1, SeqCst);
335 bar.wait();
336 }
337 });
338
339 let t2 = thread::spawn({
340 let mut bar = bar.clone();
341 let woken = Arc::clone(&woken);
342 move || {
343 // Only one wait, the second one will be done on only 2 copies
344 bar.wait();
345 woken.fetch_add(1, SeqCst);
346 }
347 });
348
349 sleep(Duration::from_millis(50));
350 assert_eq!(woken.load(SeqCst), 0);
351
352 bar.wait();
353 t2.join().unwrap();
354 bar.wait();
355 assert_eq!(woken.load(SeqCst), 2);
356
357 t1.join().unwrap();
358 }
359
360 #[test]
361 fn adjust_panic() {
362 let mut bar = Barrier::new(PanicMode::Decrement);
363 let woken = Arc::new(AtomicUsize::new(0));
364 let t1 = thread::spawn({
365 let mut bar = bar.clone();
366 let woken = Arc::clone(&woken);
367 move || {
368 bar.wait();
369 woken.fetch_add(1, SeqCst);
370 bar.wait();
371 woken.fetch_add(1, SeqCst);
372 }
373 });
374
375 let t2 = thread::spawn({
376 let mut bar = bar.clone();
377 let woken = Arc::clone(&woken);
378 move || {
379 // Only one wait, the second one will be done on only 2 copies
380 bar.wait();
381 woken.fetch_add(1, SeqCst);
382 panic!("We are going to panic, woohooo, the thing still adjusts");
383 }
384 });
385
386 sleep(Duration::from_millis(50));
387 assert_eq!(woken.load(SeqCst), 0);
388
389 bar.wait();
390 t2.join().unwrap_err();
391 bar.wait();
392
393 t1.join().unwrap();
394
395 assert_eq!(woken.load(SeqCst), 3);
396 }
397
398 #[test]
399 fn adjust_drop() {
400 let bar = Barrier::new(PanicMode::Decrement);
401 let woken = Arc::new(AtomicUsize::new(0));
402 let t1 = thread::spawn({
403 let mut bar = bar.clone();
404 let woken = Arc::clone(&woken);
405 move || {
406 bar.wait();
407 woken.fetch_add(1, SeqCst);
408 bar.wait();
409 }
410 });
411
412 sleep(Duration::from_millis(50));
413 assert_eq!(woken.load(SeqCst), 0);
414
415 let t2 = thread::spawn({
416 let mut bar = bar.clone();
417 let woken = Arc::clone(&woken);
418 move || {
419 bar.wait();
420 woken.fetch_add(1, SeqCst);
421 bar.wait();
422 }
423 });
424
425 sleep(Duration::from_millis(50));
426 assert_eq!(woken.load(SeqCst), 0);
427 drop(bar);
428
429 t1.join().unwrap();
430 t2.join().unwrap();
431 assert_eq!(woken.load(SeqCst), 2);
432 }
433
434 /// Poisoning of the barrier.
435 #[test]
436 #[cfg_attr(clippy, allow(clippy::redundant_clone))]
437 fn poisoning() {
438 let mut bar = Barrier::new(PanicMode::Poison);
439 let woken = Arc::new(AtomicUsize::new(0));
440
441 let t1 = thread::spawn({
442 let mut bar = bar.clone();
443 let woken = Arc::new(AtomicUsize::new(0));
444 move || {
445 bar.wait();
446 woken.fetch_add(1, SeqCst);
447 bar.wait();
448 }
449 });
450
451 sleep(Duration::from_millis(50));
452 assert_eq!(woken.load(SeqCst), 0);
453
454 let t2 = thread::spawn({
455 let bar = bar.clone();
456 move || {
457 // Make sure this one gets into the closure so we destroy it on the panic.
458 // Not issue in practice where it would get pulled in by .wait(), but we don't have
459 // one here in test.
460 let _bar = bar;
461 panic!("Testing a panic");
462 }
463 });
464
465 // The thread 2 panics
466 t2.join().unwrap_err();
467 // And the panic propagates to t1, even though we still hold our copy of barrier.
468 t1.join().unwrap_err();
469
470 // Our last instance would panic too.
471 panic::catch_unwind(move || bar.wait()).unwrap_err();
472 }
473}