metrics_util/storage/bucket.rs
1use crossbeam_epoch::{pin as epoch_pin, Atomic, Guard, Owned, Shared};
2use crossbeam_utils::Backoff;
3use std::{
4 cell::UnsafeCell,
5 cmp::min,
6 mem::{self, MaybeUninit},
7 slice,
8 sync::atomic::{AtomicUsize, Ordering},
9};
10
11#[cfg(target_pointer_width = "16")]
12const BLOCK_SIZE: usize = 16;
13#[cfg(target_pointer_width = "32")]
14const BLOCK_SIZE: usize = 32;
15#[cfg(target_pointer_width = "64")]
16const BLOCK_SIZE: usize = 64;
17
18const DEFERRED_BLOCK_BATCH_SIZE: usize = 32;
19
20/// Discrete chunk of values with atomic read/write access.
21struct Block<T> {
22 // Write index.
23 write: AtomicUsize,
24
25 // Read bitmap.
26 //
27 // Internally, we track the write index which indicates what slot should be written by the next
28 // writer. This works fine as writers race via CAS to "acquire" a slot to write to. The
29 // trouble comes when attempting to read written values, as writers may still have writes
30 // in-flight, thus leading to potential uninitialized reads, UB, and the world imploding.
31 //
32 // We use a simple scheme where writers acknowledge their writes by setting a bit in `read`
33 // that corresponds to the index that they've written. For example, a write at index 5 being
34 // complete can be verified by checking if `1 << 5` in `read` is set. This allows writers to
35 // concurrently update `read` despite non-sequential indexes.
36 //
37 // Additionally, an optimization is then available where finding the longest sequential run of
38 // initialized slots can be trivially calculated by getting the number of trailing ones in
39 // `read`. This allows reading the "length" of initialized values in constant time, without
40 // blocking.
41 //
42 // This optimization does mean, however, that the simplest implementation is limited to block
43 // sizes that match the number of bits available in the target platform pointer size. A
44 // potential future optimization could use const generics to size an array of read bitmap
45 // atomics such that the total sum of the bits could be efficiently utilized, although this
46 // would involve more complex logic to read all of the atomics.
47 read: AtomicUsize,
48
49 // The individual slots.
50 slots: [MaybeUninit<UnsafeCell<T>>; BLOCK_SIZE],
51
52 // The "next" block to iterate, aka the block that came before this one.
53 next: Atomic<Block<T>>,
54}
55
56impl<T> Block<T> {
57 /// Creates a new [`Block`].
58 pub fn new() -> Self {
59 // SAFETY:
60 // At a high level, all types inherent to `Block<T>` can be safely zero initialized.
61 //
62 // `write`/`read` are meant to start at zero (`AtomicUsize`)
63 // `slots` is an array of `MaybeUninit`, which is zero init safe
64 // `next` is meant to start as "null", where the pointer (`AtomicUsize`) is zero
65 unsafe { MaybeUninit::zeroed().assume_init() }
66 }
67
68 // Gets the length of the next block, if it exists.
69 pub(crate) fn next_len(&self, guard: &Guard) -> usize {
70 let tail = self.next.load(Ordering::Acquire, guard);
71 if tail.is_null() {
72 return 0;
73 }
74
75 let tail_block = unsafe { tail.deref() };
76 tail_block.len()
77 }
78
79 /// Gets the current length of this block.
80 pub fn len(&self) -> usize {
81 self.read.load(Ordering::Acquire).trailing_ones() as usize
82 }
83
84 // Whether or not this block is currently quieseced i.e. no in-flight writes.
85 pub fn is_quiesced(&self) -> bool {
86 let len = self.len();
87 if len == BLOCK_SIZE {
88 return true;
89 }
90
91 // We have to clamp self.write since multiple threads might race on filling the last block,
92 // so the value could actually exceed BLOCK_SIZE.
93 min(self.write.load(Ordering::Acquire), BLOCK_SIZE) == len
94 }
95
96 /// Gets a slice of the data written to this block.
97 pub fn data(&self) -> &[T] {
98 // SAFETY:
99 // We can always get a pointer to the first slot, but the reference we give back will only
100 // be as long as the number of slots written, indicated by `len`. The value of `len` is
101 // only updated once a slot has been fully written, guaranteeing the slot is initialized.
102 let len = self.len();
103 unsafe {
104 let head = self.slots.get_unchecked(0).as_ptr();
105 slice::from_raw_parts(head as *const T, len)
106 }
107 }
108
109 /// Pushes a value into this block.
110 pub fn push(&self, value: T) -> Result<(), T> {
111 // Try to increment the index. If we've reached the end of the block, let the bucket know
112 // so it can attach another block.
113 let index = self.write.fetch_add(1, Ordering::AcqRel);
114 if index >= BLOCK_SIZE {
115 return Err(value);
116 }
117
118 // SAFETY:
119 // - We never index outside of our block size.
120 // - Each slot is `MaybeUninit`, which itself can be safely zero initialized.
121 // - We're writing an initialized value into the slot before anyone is able to ever read
122 // it, ensuring no uninitialized access.
123 unsafe {
124 // Update the slot.
125 self.slots.get_unchecked(index).assume_init_ref().get().write(value);
126 }
127
128 // Scoot our read index forward.
129 self.read.fetch_or(1 << index, Ordering::AcqRel);
130
131 Ok(())
132 }
133}
134
135unsafe impl<T: Send> Send for Block<T> {}
136unsafe impl<T: Sync> Sync for Block<T> {}
137
138impl<T> Drop for Block<T> {
139 fn drop(&mut self) {
140 while !self.is_quiesced() {}
141
142 // SAFETY:
143 // The value of `len` is only updated once a slot has been fully written, guaranteeing the
144 // slot is initialized. Thus, we're only touching initialized slots here.
145 unsafe {
146 let len = self.len();
147 for i in 0..len {
148 self.slots.get_unchecked(i).assume_init_ref().get().drop_in_place();
149 }
150 }
151 }
152}
153
154impl<T> std::fmt::Debug for Block<T> {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 let guard = &epoch_pin();
157 let has_next = !self.next.load(Ordering::Acquire, guard).is_null();
158 f.debug_struct("Block")
159 .field("type", &std::any::type_name::<T>())
160 .field("block_size", &BLOCK_SIZE)
161 .field("write", &self.write.load(Ordering::Acquire))
162 .field("read", &self.read.load(Ordering::Acquire))
163 .field("len", &self.len())
164 .field("has_next", &has_next)
165 .finish()
166 }
167}
168
169/// A lock-free bucket with snapshot capabilities.
170///
171/// This bucket is implemented as a singly-linked list of blocks, where each block is a small
172/// buffer that can hold a handful of elements. There is no limit to how many elements can be in
173/// the bucket at a time. Blocks are dynamically allocated as elements are pushed into the bucket.
174///
175/// Unlike a queue, buckets cannot be drained element by element: callers must iterate the whole
176/// structure. Reading the bucket happens in a quasi-reverse fashion, to allow writers to make
177/// forward progress without affecting the iteration of the previously written values.
178///
179/// For example, in a scenario where an internal block can hold 4 elements, and the caller has
180/// written 10 elements to the bucket, you would expect to see the values in this order when iterating:
181///
182/// ```text
183/// [6 7 8 9] [2 3 4 5] [0 1]
184/// ```
185///
186/// Block sizes are dependent on the target architecture, where each block can hold N items, and N
187/// is the number of bits in the target architecture's pointer width.
188#[derive(Debug)]
189pub struct AtomicBucket<T> {
190 tail: Atomic<Block<T>>,
191}
192
193impl<T> AtomicBucket<T> {
194 /// Creates a new, empty bucket.
195 pub fn new() -> Self {
196 AtomicBucket { tail: Atomic::null() }
197 }
198
199 /// Checks whether or not this bucket is empty.
200 pub fn is_empty(&self) -> bool {
201 let guard = &epoch_pin();
202 let tail = self.tail.load(Ordering::Acquire, guard);
203 if tail.is_null() {
204 return true;
205 }
206
207 // We have to check the next block of our tail in case the current tail is simply a fresh
208 // block that has not been written to yet.
209 let tail_block = unsafe { tail.deref() };
210 tail_block.len() == 0 && tail_block.next_len(guard) == 0
211 }
212
213 /// Pushes an element into the bucket.
214 pub fn push(&self, value: T) {
215 let mut original = value;
216 let guard = &epoch_pin();
217 loop {
218 // Load the tail block, or install a new one.
219 let mut tail = self.tail.load(Ordering::Acquire, guard);
220 if tail.is_null() {
221 // No blocks at all yet. We need to create one.
222 match self.tail.compare_exchange(
223 Shared::null(),
224 Owned::new(Block::new()),
225 Ordering::AcqRel,
226 Ordering::Acquire,
227 guard,
228 ) {
229 // We won the race to install the new block.
230 Ok(ptr) => tail = ptr,
231 // Somebody else beat us, so just update our pointer.
232 Err(e) => tail = e.current,
233 }
234 }
235
236 // We have a block now, so we need to try writing to it.
237 let tail_block = unsafe { tail.deref() };
238 match tail_block.push(original) {
239 // If the push was OK, then the block wasn't full. It might _now_ be full, but we'll
240 // let future callers deal with installing a new block if necessary.
241 Ok(_) => return,
242 // The block was full, so we've been given the value back and we need to install a new block.
243 Err(value) => {
244 match self.tail.compare_exchange(
245 tail,
246 Owned::new(Block::new()),
247 Ordering::AcqRel,
248 Ordering::Acquire,
249 guard,
250 ) {
251 // We managed to install the block, so we need to link this new block to
252 // the nextious block.
253 Ok(ptr) => {
254 let new_tail = unsafe { ptr.deref() };
255 new_tail.next.store(tail, Ordering::Release);
256
257 // Now push into our new block.
258 match new_tail.push(value) {
259 // We wrote the value successfully, so we're good here!
260 Ok(_) => return,
261 // The block was full, so just loop and start over.
262 Err(value) => {
263 original = value;
264 continue;
265 }
266 }
267 }
268 // Somebody else installed the block before us, so let's just start over.
269 Err(_) => original = value,
270 }
271 }
272 }
273 }
274 }
275
276 /// Collects all of the elements written to the bucket.
277 ///
278 /// This operation can be slow as it involves allocating enough space to hold all of the
279 /// elements within the bucket. Consider [`data_with`](AtomicBucket::data_with) to incrementally iterate
280 /// the internal blocks within the bucket.
281 ///
282 /// Elements are in partial reverse order: blocks are iterated in reverse order, but the
283 /// elements within them will appear in their original order.
284 pub fn data(&self) -> Vec<T>
285 where
286 T: Clone,
287 {
288 let mut values = Vec::new();
289 self.data_with(|block| values.extend_from_slice(block));
290 values
291 }
292
293 /// Iterates all of the elements written to the bucket, invoking `f` for each block.
294 ///
295 /// Elements are in partial reverse order: blocks are iterated in reverse order, but the
296 /// elements within them will appear in their original order.
297 pub fn data_with<F>(&self, mut f: F)
298 where
299 F: FnMut(&[T]),
300 {
301 let guard = &epoch_pin();
302 let backoff = Backoff::new();
303
304 // While we have a valid block -- either `tail` or the next block as we keep reading -- we
305 // load the data from each block and process it by calling `f`.
306 let mut block_ptr = self.tail.load(Ordering::Acquire, guard);
307 while !block_ptr.is_null() {
308 let block = unsafe { block_ptr.deref() };
309
310 // We wait for the block to be quiesced to ensure we get any in-flight writes, and
311 // snoozing specifically yields the reading thread to ensure things are given a
312 // chance to complete.
313 while !block.is_quiesced() {
314 backoff.snooze();
315 }
316
317 // Read the data out of the block.
318 let data = block.data();
319 f(data);
320
321 // Load the next block.
322 block_ptr = block.next.load(Ordering::Acquire, guard);
323 }
324 }
325
326 /// Clears the bucket.
327 ///
328 /// Deallocation of the internal blocks happens only when all readers have finished, and so
329 /// will not necessarily occur during or immediately preceding this method.
330 ///
331 /// # Note
332 /// This method will not affect reads that are already in progress.
333 pub fn clear(&self) {
334 self.clear_with(|_: &[T]| {})
335 }
336
337 /// Clears the bucket, invoking `f` for every block that will be cleared.
338 ///
339 /// Deallocation of the internal blocks happens only when all readers have finished, and so
340 /// will not necessarily occur during or immediately preceding this method.
341 ///
342 /// This method is useful for accumulating values and then observing them, in a way that allows
343 /// the caller to avoid visiting the same values again the next time.
344 ///
345 /// This method allows a pattern of observing values before they're cleared, with a clear
346 /// demarcation. A similar pattern used in the wild would be to have some data structure, like
347 /// a vector, which is continuously filled, and then eventually swapped out with a new, empty
348 /// vector, allowing the caller to read all of the old values while new values are being
349 /// written, over and over again.
350 ///
351 /// # Note
352 /// This method will not affect reads that are already in progress.
353 pub fn clear_with<F>(&self, mut f: F)
354 where
355 F: FnMut(&[T]),
356 {
357 // We simply swap the tail pointer which effectively clears the bucket. Callers might
358 // still be in process of writing to the tail node, or reading the data, but new callers
359 // will see it as empty until another write proceeds.
360 let guard = &epoch_pin();
361 let mut block_ptr = self.tail.load(Ordering::Acquire, guard);
362 if !block_ptr.is_null()
363 && self
364 .tail
365 .compare_exchange(
366 block_ptr,
367 Shared::null(),
368 Ordering::SeqCst,
369 Ordering::SeqCst,
370 guard,
371 )
372 .is_ok()
373 {
374 let backoff = Backoff::new();
375 let mut freeable_blocks = Vec::new();
376
377 // While we have a valid block -- either `tail` or the next block as we keep reading -- we
378 // load the data from each block and process it by calling `f`.
379 while !block_ptr.is_null() {
380 let block = unsafe { block_ptr.deref() };
381
382 // We wait for the block to be quiesced to ensure we get any in-flight writes, and
383 // snoozing specifically yields the reading thread to ensure things are given a
384 // chance to complete.
385 while !block.is_quiesced() {
386 backoff.snooze();
387 }
388
389 // Read the data out of the block.
390 let data = block.data();
391 f(data);
392
393 // Load the next block and take the shared reference to the current.
394 let old_block_ptr =
395 mem::replace(&mut block_ptr, block.next.load(Ordering::Acquire, guard));
396
397 freeable_blocks.push(old_block_ptr);
398 if freeable_blocks.len() >= DEFERRED_BLOCK_BATCH_SIZE {
399 let blocks = mem::take(&mut freeable_blocks);
400 unsafe {
401 guard.defer_unchecked(move || {
402 for block in blocks {
403 drop(block.into_owned());
404 }
405 });
406 }
407 }
408 }
409
410 // Free any remaining old blocks.
411 if !freeable_blocks.is_empty() {
412 unsafe {
413 guard.defer_unchecked(move || {
414 for block in freeable_blocks {
415 drop(block.into_owned());
416 }
417 });
418 }
419 }
420
421 // This asks the global collector to attempt to drive execution of deferred operations a
422 // little sooner than it may have done so otherwise.
423 guard.flush();
424 }
425 }
426}
427
428impl<T> Default for AtomicBucket<T> {
429 fn default() -> Self {
430 Self { tail: Atomic::null() }
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::{AtomicBucket, Block, BLOCK_SIZE};
437 use crossbeam_utils::thread::scope;
438
439 #[test]
440 fn test_create_new_block() {
441 let block: Block<u64> = Block::new();
442 assert_eq!(block.len(), 0);
443
444 let data = block.data();
445 assert_eq!(data.len(), 0);
446 }
447
448 #[test]
449 fn test_block_write_then_read() {
450 let block = Block::new();
451 assert_eq!(block.len(), 0);
452
453 let data = block.data();
454 assert_eq!(data.len(), 0);
455
456 let result = block.push(42);
457 assert!(result.is_ok());
458 assert_eq!(block.len(), 1);
459
460 let data = block.data();
461 assert_eq!(data.len(), 1);
462 assert_eq!(data[0], 42);
463 }
464
465 #[test]
466 fn test_block_write_until_full_then_read() {
467 let block = Block::new();
468 assert_eq!(block.len(), 0);
469
470 let data = block.data();
471 assert_eq!(data.len(), 0);
472
473 let mut i = 0;
474 let mut total = 0;
475 while i < BLOCK_SIZE as u64 {
476 assert!(block.push(i).is_ok());
477
478 total += i;
479 i += 1;
480 }
481
482 let data = block.data();
483 assert_eq!(data.len(), BLOCK_SIZE);
484
485 let sum: u64 = data.iter().sum();
486 assert_eq!(sum, total);
487
488 let result = block.push(42);
489 assert!(result.is_err());
490 }
491
492 #[test]
493 fn test_block_write_until_full_then_read_mt() {
494 let block = Block::new();
495 assert_eq!(block.len(), 0);
496
497 let data = block.data();
498 assert_eq!(data.len(), 0);
499
500 let res = scope(|s| {
501 let t1 = s.spawn(|_| {
502 let mut i = 0;
503 let mut total = 0;
504 while i < BLOCK_SIZE as u64 / 2 {
505 assert!(block.push(i).is_ok());
506
507 total += i;
508 i += 1;
509 }
510 total
511 });
512
513 let t2 = s.spawn(|_| {
514 let mut i = 0;
515 let mut total = 0;
516 while i < BLOCK_SIZE as u64 / 2 {
517 assert!(block.push(i).is_ok());
518
519 total += i;
520 i += 1;
521 }
522 total
523 });
524
525 let t1_total = t1.join().unwrap();
526 let t2_total = t2.join().unwrap();
527
528 t1_total + t2_total
529 });
530
531 let total = res.unwrap();
532
533 let data = block.data();
534 assert_eq!(data.len(), BLOCK_SIZE);
535
536 let sum: u64 = data.iter().sum();
537 assert_eq!(sum, total);
538
539 let result = block.push(42);
540 assert!(result.is_err());
541 }
542
543 #[test]
544 fn test_bucket_write_then_read() {
545 let bucket = AtomicBucket::new();
546 bucket.push(42);
547
548 let snapshot = bucket.data();
549 assert_eq!(snapshot.len(), 1);
550 assert_eq!(snapshot[0], 42);
551 }
552
553 #[test]
554 fn test_bucket_multiple_blocks_write_then_read() {
555 let bucket = AtomicBucket::new();
556
557 let snapshot = bucket.data();
558 assert_eq!(snapshot.len(), 0);
559
560 let target = (BLOCK_SIZE * 3 + BLOCK_SIZE / 2) as u64;
561 let mut i = 0;
562 let mut total = 0;
563 while i < target {
564 bucket.push(i);
565
566 total += i;
567 i += 1;
568 }
569
570 let snapshot = bucket.data();
571 assert_eq!(snapshot.len(), target as usize);
572
573 let sum: u64 = snapshot.iter().sum();
574 assert_eq!(sum, total);
575 }
576
577 #[test]
578 fn test_bucket_write_then_read_mt() {
579 let bucket = AtomicBucket::new();
580
581 let snapshot = bucket.data();
582 assert_eq!(snapshot.len(), 0);
583
584 let res = scope(|s| {
585 let t1 = s.spawn(|_| {
586 let mut i = 0;
587 let mut total = 0;
588 while i < BLOCK_SIZE as u64 * 100_000 {
589 bucket.push(i);
590
591 total += i;
592 i += 1;
593 }
594 total
595 });
596
597 let t2 = s.spawn(|_| {
598 let mut i = 0;
599 let mut total = 0;
600 while i < BLOCK_SIZE as u64 * 100_000 {
601 bucket.push(i);
602
603 total += i;
604 i += 1;
605 }
606 total
607 });
608
609 let t1_total = t1.join().unwrap();
610 let t2_total = t2.join().unwrap();
611
612 t1_total + t2_total
613 });
614
615 let total = res.unwrap();
616
617 let snapshot = bucket.data();
618 assert_eq!(snapshot.len(), BLOCK_SIZE * 200_000);
619
620 let sum = snapshot.iter().sum::<u64>();
621 assert_eq!(sum, total);
622 }
623
624 #[test]
625 fn test_clear_and_clear_with() {
626 let bucket = AtomicBucket::new();
627
628 let snapshot = bucket.data();
629 assert_eq!(snapshot.len(), 0);
630
631 let mut i = 0;
632 let mut total_pushed = 0;
633 while i < BLOCK_SIZE * 4 {
634 bucket.push(i);
635
636 total_pushed += i;
637 i += 1;
638 }
639
640 let snapshot = bucket.data();
641 assert_eq!(snapshot.len(), i);
642
643 let mut total_accumulated = 0;
644 bucket.clear_with(|xs| total_accumulated += xs.iter().sum::<usize>());
645 assert_eq!(total_pushed, total_accumulated);
646
647 let snapshot = bucket.data();
648 assert_eq!(snapshot.len(), 0);
649 }
650
651 #[test]
652 fn test_bucket_len_and_next_len() {
653 let bucket = AtomicBucket::new();
654 assert!(bucket.is_empty());
655
656 let snapshot = bucket.data();
657 assert_eq!(snapshot.len(), 0);
658
659 // Just making sure that `is_empty` holds as we go from
660 // the first block, to the second block, to exercise the
661 // `Block::next_len` codepath.
662 let mut i = 0;
663 while i < BLOCK_SIZE * 2 {
664 bucket.push(i);
665 assert!(!bucket.is_empty());
666 i += 1;
667 }
668 }
669}