shuttle/sync/once.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
use crate::runtime::execution::ExecutionState;
use crate::runtime::storage::StorageKey;
use crate::runtime::task::clock::VectorClock;
use crate::sync::Mutex;
use std::cell::RefCell;
use std::rc::Rc;
use tracing::trace;
/// A synchronization primitive which can be used to run a one-time global initialization. Useful
/// for one-time initialization for FFI or related functionality. This type can only be constructed
/// with [`Once::new()`].
#[derive(Debug)]
pub struct Once {
// We use the address of the `Once` as an identifier, so it can't be zero-sized even though all
// its state is stored in ExecutionState storage
_dummy: usize,
}
/// A `Once` cell can either be `Running`, in which case a `Mutex` mediates racing threads trying to
/// invoke `call_once`, or `Complete` once an initializer has completed, in which case the `Mutex`
/// is no longer necessary.
enum OnceInitState {
Running(Rc<Mutex<bool>>),
Complete(VectorClock),
}
impl std::fmt::Debug for OnceInitState {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Running(_) => write!(f, "Running"),
Self::Complete(_) => write!(f, "Complete"),
}
}
}
impl Once {
/// Creates a new `Once` value.
#[must_use]
#[allow(clippy::new_without_default)]
pub const fn new() -> Self {
Self { _dummy: 0 }
}
/// Performs an initialization routine once and only once. The given closure will be executed
/// if this is the first time `call_once` has been called, and otherwise the routine will *not*
/// be invoked.
///
/// This method will block the calling thread if another initialization routine is currently
/// running.
///
/// When this function returns, it is guaranteed that some initialization has run and completed
/// (it may not be the closure specified).
pub fn call_once<F>(&self, f: F)
where
F: FnOnce(),
{
self.call_once_inner(|_state| f(), false);
}
/// Performs the same function as [`Once::call_once()`] except ignores poisoning.
///
/// If the cell has previously been poisoned, this function will still attempt to call the given
/// closure. If the closure does not panic, the cell will no longer be poisoned.
pub fn call_once_force<F>(&self, f: F)
where
F: FnOnce(&OnceState),
{
self.call_once_inner(f, true);
}
/// Returns `true` if some [`Once::call_once()`] call has completed successfully.
pub fn is_completed(&self) -> bool {
ExecutionState::with(|state| {
let init = match self.get_state(state) {
Some(init) => init,
None => return false,
};
let init_state = init.borrow();
match &*init_state {
OnceInitState::Complete(clock) => {
let clock = clock.clone();
drop(init_state);
state.update_clock(&clock);
true
}
_ => false,
}
})
}
fn call_once_inner<F>(&self, f: F, ignore_poisoning: bool)
where
F: FnOnce(&OnceState),
{
let lock = ExecutionState::with(|state| {
// Initialize the state of the `Once` cell if we're the first thread to try
if self.get_state(state).is_none() {
self.init_state(state, OnceInitState::Running(Rc::new(Mutex::new(false))));
}
let init = self.get_state(state).expect("must be initialized by this point");
let init_state = init.borrow();
trace!(state=?init_state, "call_once on cell {:p}", self);
match &*init_state {
OnceInitState::Complete(clock) => {
// If already complete, just update the clock from the thread that inited
let clock = clock.clone();
drop(init_state);
state.update_clock(&clock);
None
}
OnceInitState::Running(lock) => Some(Rc::clone(lock)),
}
});
// If there's a lock, then we need to try racing on it to decide who gets to run their
// initialization closure.
if let Some(lock) = lock {
let (mut flag, is_poisoned) = match lock.lock() {
Ok(flag) => (flag, false),
Err(_) if !ignore_poisoning => panic!("Once instance has previously been poisoned"),
Err(err) => (err.into_inner(), true),
};
if *flag {
return;
}
trace!("won the call_once race for cell {:p}", self);
f(&OnceState(is_poisoned));
*flag = true;
// We were the thread that won the race, so remember our current clock to establish
// causality with future threads that try (and fail) to run `call_once`. The threads
// that were racing with us will get causality through acquiring the `Mutex`.
ExecutionState::with(|state| {
let clock = state.increment_clock().clone();
*self
.get_state(state)
.expect("must be initialized by this point")
.borrow_mut() = OnceInitState::Complete(clock);
});
}
}
fn get_state<'a>(&self, from: &'a ExecutionState) -> Option<&'a RefCell<OnceInitState>> {
from.get_storage::<_, RefCell<OnceInitState>>(self)
}
fn init_state(&self, into: &mut ExecutionState, new_state: OnceInitState) {
into.init_storage::<_, RefCell<OnceInitState>>(self, RefCell::new(new_state));
}
}
/// State yielded to [`Once::call_once_force()`]'s closure parameter. The state can be used to query
/// the poison status of the [`Once`].
#[derive(Debug)]
#[non_exhaustive]
pub struct OnceState(bool);
impl OnceState {
/// Returns `true` if the associated [`Once`] was poisoned prior to the invocation of the
/// closure passed to [`Once::call_once_force()`].
pub fn is_poisoned(&self) -> bool {
self.0
}
}
impl From<&Once> for StorageKey {
fn from(once: &Once) -> Self {
StorageKey(once as *const _ as usize, 0x2)
}
}