1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
//! A concurrency primitive for high concurrency reads over a single-writer data structure.
//!
//! The primitive keeps two copies of the backing data structure, one that is accessed by readers,
//! and one that is access by the (single) writer. This enables all reads to proceed in parallel
//! with minimal coordination, and shifts the coordination overhead to the writer. In the absence
//! of writes, reads scale linearly with the number of cores.
//!
//! When the writer wishes to expose new changes to the datastructure (see
//! [`WriteHandle::publish`]), it "flips" the two copies so that subsequent reads go to the old
//! "write side", and future writers go to the old "read side". This process does cause two cache
//! line invalidations for the readers, but does not stop them from making progress (i.e., reads
//! are wait-free).
//!
//! In order to keep both copies up to date, left-right keeps an operational log ("oplog") of all
//! the modifications to the data structure, which it uses to bring the old read data up to date
//! with the latest writes on a flip. Since there are two copies of the data, each oplog entry is
//! applied twice: once to the write copy and again to the (stale) read copy.
//!
//! # Trade-offs
//!
//! Few concurrency wins come for free, and this one is no exception. The drawbacks of this
//! primitive are:
//!
//! - **Increased memory use**: since we keep two copies of the backing data structure, we are
//! effectively doubling the memory use of the underlying data. With some clever de-duplication,
//! this cost can be ameliorated to some degree, but it's something to be aware of. Furthermore,
//! if writers only call `publish` infrequently despite adding many writes to the operational log,
//! the operational log itself may grow quite large, which adds additional overhead.
//! - **Deterministic operations**: as the entries in the operational log are applied twice, once
//! to each copy of the data, it is essential that the operations are deterministic. If they are
//! not, the two copies will no longer mirror one another, and will continue to diverge over time.
//! - **Single writer**: left-right only supports a single writer. To have multiple writers, you
//! need to ensure exclusive access to the [`WriteHandle`] through something like a
//! [`Mutex`](std::sync::Mutex).
//! - **Slow writes**: Writes through left-right are slower than they would be directly against
//! the backing datastructure. This is both because they have to go through the operational log,
//! and because they must each be applied twice.
//!
//! # How does it work?
//!
//! Take a look at [this YouTube video](https://www.youtube.com/watch?v=eLNAMEoKAAc) which goes
//! through the basic concurrency algorithm, as well as the initial development of this library.
//! Alternatively, there's a shorter (but also less complete) description in [this
//! talk](https://www.youtube.com/watch?v=s19G6n0UjsM&t=1994).
//!
//! At a glance, left-right is implemented using two regular `T`s, an operational log, epoch
//! counting, and some pointer magic. There is a single pointer through which all readers go. It
//! points to a `T` that the readers access in order to read data. Every time a read has accessed
//! the pointer, they increment a local epoch counter, and they update it again when they have
//! finished the read. When a write occurs, the writer updates the other `T` (for which there are
//! no readers), and also stores a copy of the change in a log. When [`WriteHandle::publish`] is
//! called, the writer, atomically swaps the reader pointer to point to the other `T`. It then
//! waits for the epochs of all current readers to change, and then replays the operational log to
//! bring the stale copy up to date.
//!
//! The design resembles this [left-right concurrency
//! scheme](https://hal.archives-ouvertes.fr/hal-01207881/document) from 2015, though I am not
//! aware of any follow-up to that work.
//!
//! # How do I use it?
//!
//! If you just want a data structure for fast reads, you likely want to use a crate that _uses_
//! this crate, like [`evmap`](https://docs.rs/evmap/). If you want to develop such a crate
//! yourself, here's what you do:
//!
//! ```rust
//! use left_right::{Absorb, ReadHandle, WriteHandle};
//!
//! // First, define an operational log type.
//! // For most real-world use-cases, this will be an `enum`, but we'll keep it simple:
//! struct CounterAddOp(i32);
//!
//! // Then, implement the unsafe `Absorb` trait for your data structure type,
//! // and provide the oplog type as the generic argument.
//! // You can read this as "`i32` can absorb changes of type `CounterAddOp`".
//! impl Absorb<CounterAddOp> for i32 {
//! // See the documentation of `Absorb::absorb_first`.
//! //
//! // Essentially, this is where you define what applying
//! // the oplog type to the datastructure does.
//! fn absorb_first(&mut self, operation: &mut CounterAddOp, _: &Self) {
//! *self += operation.0;
//! }
//!
//! // See the documentation of `Absorb::absorb_second`.
//! //
//! // This may or may not be the same as `absorb_first`,
//! // depending on whether or not you de-duplicate values
//! // across the two copies of your data structure.
//! fn absorb_second(&mut self, operation: CounterAddOp, _: &Self) {
//! *self += operation.0;
//! }
//!
//! // See the documentation of `Absorb::drop_first`.
//! fn drop_first(self: Box<Self>) {}
//!
//! fn sync_with(&mut self, first: &Self) {
//! *self = *first
//! }
//! }
//!
//! // Now, you can construct a new left-right over an instance of your data structure.
//! // This will give you a `WriteHandle` that accepts writes in the form of oplog entries,
//! // and a (cloneable) `ReadHandle` that gives you `&` access to the data structure.
//! let (write, read) = left_right::new::<i32, CounterAddOp>();
//!
//! // You will likely want to embed these handles in your own types so that you can
//! // provide more ergonomic methods for performing operations on your type.
//! struct Counter(WriteHandle<i32, CounterAddOp>);
//! impl Counter {
//! // The methods on you write handle type will likely all just add to the operational log.
//! pub fn add(&mut self, i: i32) {
//! self.0.append(CounterAddOp(i));
//! }
//!
//! // You should also provide a method for exposing the results of any pending operations.
//! //
//! // Until this is called, any writes made since the last call to `publish` will not be
//! // visible to readers. See `WriteHandle::publish` for more details. Make sure to call
//! // this out in _your_ documentation as well, so that your users will be aware of this
//! // "weird" behavior.
//! pub fn publish(&mut self) {
//! self.0.publish();
//! }
//! }
//!
//! // Similarly, for reads:
//! #[derive(Clone)]
//! struct CountReader(ReadHandle<i32>);
//! impl CountReader {
//! pub fn get(&self) -> i32 {
//! // The `ReadHandle` itself does not allow you to access the underlying data.
//! // Instead, you must first "enter" the data structure. This is similar to
//! // taking a `Mutex`, except that no lock is actually taken. When you enter,
//! // you are given back a guard, which gives you shared access (through the
//! // `Deref` trait) to the "read copy" of the data structure.
//! //
//! // Note that `enter` may yield `None`, which implies that the `WriteHandle`
//! // was dropped, and took the backing data down with it.
//! //
//! // Note also that for as long as the guard lives, a writer that tries to
//! // call `WriteHandle::publish` will be blocked from making progress.
//! self.0.enter().map(|guard| *guard).unwrap_or(0)
//! }
//! }
//!
//! // These wrapper types are likely what you'll give out to your consumers.
//! let (mut w, r) = (Counter(write), CountReader(read));
//!
//! // They can then use the type fairly ergonomically:
//! assert_eq!(r.get(), 0);
//! w.add(1);
//! // no call to publish, so read side remains the same:
//! assert_eq!(r.get(), 0);
//! w.publish();
//! assert_eq!(r.get(), 1);
//! drop(w);
//! // writer dropped data, so reads yield fallback value:
//! assert_eq!(r.get(), 0);
//! ```
//!
//! One additional noteworthy detail: much like with `Mutex`, `RwLock`, and `RefCell` from the
//! standard library, the values you dereference out of a `ReadGuard` are tied to the lifetime of
//! that `ReadGuard`. This can make it awkward to write ergonomic methods on the read handle that
//! return references into the underlying data, and may tempt you to clone the data out or take a
//! closure instead. Instead, consider using [`ReadGuard::map`] and [`ReadGuard::try_map`], which
//! (like `RefCell`'s [`Ref::map`](std::cell::Ref::map)) allow you to provide a guarded reference
//! deeper into your data structure.
#![warn(
missing_docs,
rust_2018_idioms,
missing_debug_implementations,
broken_intra_doc_links
)]
#![allow(clippy::type_complexity)]
mod sync;
use crate::sync::{Arc, AtomicUsize, Mutex};
type Epochs = Arc<Mutex<slab::Slab<Arc<AtomicUsize>>>>;
mod write;
pub use crate::write::Taken;
pub use crate::write::WriteHandle;
mod read;
pub use crate::read::{ReadGuard, ReadHandle, ReadHandleFactory};
pub mod aliasing;
/// Types that can incorporate operations of type `O`.
///
/// This trait allows `left-right` to keep the two copies of the underlying data structure (see the
/// [crate-level documentation](crate)) the same over time. Each write operation to the data
/// structure is logged as an operation of type `O` in an _operational log_ (oplog), and is applied
/// once to each copy of the data.
///
/// Implementations should ensure that the absorbption of each `O` is deterministic. That is, if
/// two instances of the implementing type are initially equal, and then absorb the same `O`,
/// they should remain equal afterwards. If this is not the case, the two copies will drift apart
/// over time, and hold different values.
///
/// The trait provides separate methods for the first and second absorption of each `O`. For many
/// implementations, these will be the same (which is why `absorb_second` defaults to calling
/// `absorb_first`), but not all. In particular, some implementations may need to modify the `O` to
/// ensure deterministic results when it is applied to the second copy. Or, they may need to
/// ensure that removed values in the data structure are only dropped when they are removed from
/// _both_ copies, in case they alias the backing data to save memory.
///
/// For the same reason, `Absorb` allows implementors to define `drop_first`, which is used to drop
/// the first of the two copies. In this case, de-duplicating implementations may need to forget
/// values rather than drop them so that they are not dropped twice when the second copy is
/// dropped.
pub trait Absorb<O> {
/// Apply `O` to the first of the two copies.
///
/// `other` is a reference to the other copy of the data, which has seen all operations up
/// until the previous call to [`WriteHandle::publish`]. That is, `other` is one "publish
/// cycle" behind.
fn absorb_first(&mut self, operation: &mut O, other: &Self);
/// Apply `O` to the second of the two copies.
///
/// `other` is a reference to the other copy of the data, which has seen all operations up to
/// the call to [`WriteHandle::publish`] that initially exposed this `O`. That is, `other` is
/// one "publish cycle" ahead.
///
/// Note that this method should modify the underlying data in _exactly_ the same way as
/// `O` modified `other`, otherwise the two copies will drift apart. Be particularly mindful of
/// non-deterministic implementations of traits that are often assumed to be deterministic
/// (like `Eq` and `Hash`), and of "hidden states" that subtly affect results like the
/// `RandomState` of a `HashMap` which can change iteration order.
///
/// Defaults to calling `absorb_first`.
fn absorb_second(&mut self, mut operation: O, other: &Self) {
Self::absorb_first(self, &mut operation, other)
}
/// Drop the first of the two copies.
///
/// Defaults to calling `Self::drop`.
#[allow(clippy::boxed_local)]
fn drop_first(self: Box<Self>) {}
/// Drop the second of the two copies.
///
/// Defaults to calling `Self::drop`.
#[allow(clippy::boxed_local)]
fn drop_second(self: Box<Self>) {}
/// Sync the data from `first` into `self`.
///
/// To improve initialization performance, before the first call to `publish` changes aren't
/// added to the internal oplog, but applied to the first copy directly using `absorb_second`.
/// The first `publish` then calls `sync_with` instead of `absorb_second`.
///
/// `sync_with` should ensure that `self`'s state exactly matches that of `first` after it
/// returns. Be particularly mindful of non-deterministic implementations of traits that are
/// often assumed to be deterministic (like `Eq` and `Hash`), and of "hidden states" that
/// subtly affect results like the `RandomState` of a `HashMap` which can change iteration
/// order.
fn sync_with(&mut self, first: &Self);
}
/// Construct a new write and read handle pair from an empty data structure.
///
/// The type must implement `Clone` so we can construct the second copy from the first.
pub fn new_from_empty<T, O>(t: T) -> (WriteHandle<T, O>, ReadHandle<T>)
where
T: Absorb<O> + Clone,
{
let epochs = Default::default();
let r = ReadHandle::new(t.clone(), Arc::clone(&epochs));
let w = WriteHandle::new(t, epochs, r.clone());
(w, r)
}
/// Construct a new write and read handle pair from the data structure default.
///
/// The type must implement `Default` so we can construct two empty instances. You must ensure that
/// the trait's `Default` implementation is deterministic and idempotent - that is to say, two
/// instances created by it must behave _exactly_ the same. An example of where this is problematic
/// is `HashMap` - due to `RandomState`, two instances returned by `Default` may have a different
/// iteration order.
///
/// If your type's `Default` implementation does not guarantee this, you can use `new_from_empty`,
/// which relies on `Clone` instead of `Default`.
pub fn new<T, O>() -> (WriteHandle<T, O>, ReadHandle<T>)
where
T: Absorb<O> + Default,
{
let epochs = Default::default();
let r = ReadHandle::new(T::default(), Arc::clone(&epochs));
let w = WriteHandle::new(T::default(), epochs, r.clone());
(w, r)
}