metrics_util/storage/
bucket.rs

1use crossbeam_epoch::{pin as epoch_pin, Atomic, Guard, Owned, Shared};
2use crossbeam_utils::Backoff;
3use std::{
4    cell::UnsafeCell,
5    cmp::min,
6    mem::{self, MaybeUninit},
7    slice,
8    sync::atomic::{AtomicUsize, Ordering},
9};
10
11#[cfg(target_pointer_width = "16")]
12const BLOCK_SIZE: usize = 16;
13#[cfg(target_pointer_width = "32")]
14const BLOCK_SIZE: usize = 32;
15#[cfg(target_pointer_width = "64")]
16const BLOCK_SIZE: usize = 64;
17
18const DEFERRED_BLOCK_BATCH_SIZE: usize = 32;
19
20/// Discrete chunk of values with atomic read/write access.
21struct Block<T> {
22    // Write index.
23    write: AtomicUsize,
24
25    // Read bitmap.
26    //
27    // Internally, we track the write index which indicates what slot should be written by the next
28    // writer.  This works fine as writers race via CAS to "acquire" a slot to write to.  The
29    // trouble comes when attempting to read written values, as writers may still have writes
30    // in-flight, thus leading to potential uninitialized reads, UB, and the world imploding.
31    //
32    // We use a simple scheme where writers acknowledge their writes by setting a bit in `read`
33    // that corresponds to the index that they've written.  For example, a write at index 5 being
34    // complete can be verified by checking if `1 << 5` in `read` is set.  This allows writers to
35    // concurrently update `read` despite non-sequential indexes.
36    //
37    // Additionally, an optimization is then available where finding the longest sequential run of
38    // initialized slots can be trivially calculated by getting the number of trailing ones in
39    // `read`.  This allows reading the "length" of initialized values in constant time, without
40    // blocking.
41    //
42    // This optimization does mean, however, that the simplest implementation is limited to block
43    // sizes that match the number of bits available in the target platform pointer size.  A
44    // potential future optimization could use const generics to size an array of read bitmap
45    // atomics such that the total sum of the bits could be efficiently utilized, although this
46    // would involve more complex logic to read all of the atomics.
47    read: AtomicUsize,
48
49    // The individual slots.
50    slots: [MaybeUninit<UnsafeCell<T>>; BLOCK_SIZE],
51
52    // The "next" block to iterate, aka the block that came before this one.
53    next: Atomic<Block<T>>,
54}
55
56impl<T> Block<T> {
57    /// Creates a new [`Block`].
58    pub fn new() -> Self {
59        // SAFETY:
60        // At a high level, all types inherent to  `Block<T>` can be safely zero initialized.
61        //
62        // `write`/`read` are meant to start at zero (`AtomicUsize`)
63        // `slots` is an array of `MaybeUninit`, which is zero init safe
64        // `next` is meant to start as "null", where the pointer (`AtomicUsize`) is zero
65        unsafe { MaybeUninit::zeroed().assume_init() }
66    }
67
68    // Gets the length of the next block, if it exists.
69    pub(crate) fn next_len(&self, guard: &Guard) -> usize {
70        let tail = self.next.load(Ordering::Acquire, guard);
71        if tail.is_null() {
72            return 0;
73        }
74
75        let tail_block = unsafe { tail.deref() };
76        tail_block.len()
77    }
78
79    /// Gets the current length of this block.
80    pub fn len(&self) -> usize {
81        self.read.load(Ordering::Acquire).trailing_ones() as usize
82    }
83
84    // Whether or not this block is currently quieseced i.e. no in-flight writes.
85    pub fn is_quiesced(&self) -> bool {
86        let len = self.len();
87        if len == BLOCK_SIZE {
88            return true;
89        }
90
91        // We have to clamp self.write since multiple threads might race on filling the last block,
92        // so the value could actually exceed BLOCK_SIZE.
93        min(self.write.load(Ordering::Acquire), BLOCK_SIZE) == len
94    }
95
96    /// Gets a slice of the data written to this block.
97    pub fn data(&self) -> &[T] {
98        // SAFETY:
99        // We can always get a pointer to the first slot, but the reference we give back will only
100        // be as long as the number of slots written, indicated by `len`.  The value of `len` is
101        // only updated once a slot has been fully written, guaranteeing the slot is initialized.
102        let len = self.len();
103        unsafe {
104            let head = self.slots.get_unchecked(0).as_ptr();
105            slice::from_raw_parts(head as *const T, len)
106        }
107    }
108
109    /// Pushes a value into this block.
110    pub fn push(&self, value: T) -> Result<(), T> {
111        // Try to increment the index.  If we've reached the end of the block, let the bucket know
112        // so it can attach another block.
113        let index = self.write.fetch_add(1, Ordering::AcqRel);
114        if index >= BLOCK_SIZE {
115            return Err(value);
116        }
117
118        // SAFETY:
119        // - We never index outside of our block size.
120        // - Each slot is `MaybeUninit`, which itself can be safely zero initialized.
121        // - We're writing an initialized value into the slot before anyone is able to ever read
122        //   it, ensuring no uninitialized access.
123        unsafe {
124            // Update the slot.
125            self.slots.get_unchecked(index).assume_init_ref().get().write(value);
126        }
127
128        // Scoot our read index forward.
129        self.read.fetch_or(1 << index, Ordering::AcqRel);
130
131        Ok(())
132    }
133}
134
135unsafe impl<T: Send> Send for Block<T> {}
136unsafe impl<T: Sync> Sync for Block<T> {}
137
138impl<T> Drop for Block<T> {
139    fn drop(&mut self) {
140        while !self.is_quiesced() {}
141
142        // SAFETY:
143        // The value of `len` is only updated once a slot has been fully written, guaranteeing the
144        // slot is initialized.  Thus, we're only touching initialized slots here.
145        unsafe {
146            let len = self.len();
147            for i in 0..len {
148                self.slots.get_unchecked(i).assume_init_ref().get().drop_in_place();
149            }
150        }
151    }
152}
153
154impl<T> std::fmt::Debug for Block<T> {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        let guard = &epoch_pin();
157        let has_next = !self.next.load(Ordering::Acquire, guard).is_null();
158        f.debug_struct("Block")
159            .field("type", &std::any::type_name::<T>())
160            .field("block_size", &BLOCK_SIZE)
161            .field("write", &self.write.load(Ordering::Acquire))
162            .field("read", &self.read.load(Ordering::Acquire))
163            .field("len", &self.len())
164            .field("has_next", &has_next)
165            .finish()
166    }
167}
168
169/// A lock-free bucket with snapshot capabilities.
170///
171/// This bucket is implemented as a singly-linked list of blocks, where each block is a small
172/// buffer that can hold a handful of elements.  There is no limit to how many elements can be in
173/// the bucket at a time.  Blocks are dynamically allocated as elements are pushed into the bucket.
174///
175/// Unlike a queue, buckets cannot be drained element by element: callers must iterate the whole
176/// structure.  Reading the bucket happens in a quasi-reverse fashion, to allow writers to make
177/// forward progress without affecting the iteration of the previously written values.
178///
179/// For example, in a scenario where an internal block can hold 4 elements, and the caller has
180/// written 10 elements to the bucket, you would expect to see the values in this order when iterating:
181///
182/// ```text
183/// [6 7 8 9] [2 3 4 5] [0 1]
184/// ```
185///
186/// Block sizes are dependent on the target architecture, where each block can hold N items, and N
187/// is the number of bits in the target architecture's pointer width.
188#[derive(Debug)]
189pub struct AtomicBucket<T> {
190    tail: Atomic<Block<T>>,
191}
192
193impl<T> AtomicBucket<T> {
194    /// Creates a new, empty bucket.
195    pub fn new() -> Self {
196        AtomicBucket { tail: Atomic::null() }
197    }
198
199    /// Checks whether or not this bucket is empty.
200    pub fn is_empty(&self) -> bool {
201        let guard = &epoch_pin();
202        let tail = self.tail.load(Ordering::Acquire, guard);
203        if tail.is_null() {
204            return true;
205        }
206
207        // We have to check the next block of our tail in case the current tail is simply a fresh
208        // block that has not been written to yet.
209        let tail_block = unsafe { tail.deref() };
210        tail_block.len() == 0 && tail_block.next_len(guard) == 0
211    }
212
213    /// Pushes an element into the bucket.
214    pub fn push(&self, value: T) {
215        let mut original = value;
216        let guard = &epoch_pin();
217        loop {
218            // Load the tail block, or install a new one.
219            let mut tail = self.tail.load(Ordering::Acquire, guard);
220            if tail.is_null() {
221                // No blocks at all yet.  We need to create one.
222                match self.tail.compare_exchange(
223                    Shared::null(),
224                    Owned::new(Block::new()),
225                    Ordering::AcqRel,
226                    Ordering::Acquire,
227                    guard,
228                ) {
229                    // We won the race to install the new block.
230                    Ok(ptr) => tail = ptr,
231                    // Somebody else beat us, so just update our pointer.
232                    Err(e) => tail = e.current,
233                }
234            }
235
236            // We have a block now, so we need to try writing to it.
237            let tail_block = unsafe { tail.deref() };
238            match tail_block.push(original) {
239                // If the push was OK, then the block wasn't full.  It might _now_ be full, but we'll
240                // let future callers deal with installing a new block if necessary.
241                Ok(_) => return,
242                // The block was full, so we've been given the value back and we need to install a new block.
243                Err(value) => {
244                    match self.tail.compare_exchange(
245                        tail,
246                        Owned::new(Block::new()),
247                        Ordering::AcqRel,
248                        Ordering::Acquire,
249                        guard,
250                    ) {
251                        // We managed to install the block, so we need to link this new block to
252                        // the nextious block.
253                        Ok(ptr) => {
254                            let new_tail = unsafe { ptr.deref() };
255                            new_tail.next.store(tail, Ordering::Release);
256
257                            // Now push into our new block.
258                            match new_tail.push(value) {
259                                // We wrote the value successfully, so we're good here!
260                                Ok(_) => return,
261                                // The block was full, so just loop and start over.
262                                Err(value) => {
263                                    original = value;
264                                    continue;
265                                }
266                            }
267                        }
268                        // Somebody else installed the block before us, so let's just start over.
269                        Err(_) => original = value,
270                    }
271                }
272            }
273        }
274    }
275
276    /// Collects all of the elements written to the bucket.
277    ///
278    /// This operation can be slow as it involves allocating enough space to hold all of the
279    /// elements within the bucket.  Consider [`data_with`](AtomicBucket::data_with) to incrementally iterate
280    /// the internal blocks within the bucket.
281    ///
282    /// Elements are in partial reverse order: blocks are iterated in reverse order, but the
283    /// elements within them will appear in their original order.
284    pub fn data(&self) -> Vec<T>
285    where
286        T: Clone,
287    {
288        let mut values = Vec::new();
289        self.data_with(|block| values.extend_from_slice(block));
290        values
291    }
292
293    /// Iterates all of the elements written to the bucket, invoking `f` for each block.
294    ///
295    /// Elements are in partial reverse order: blocks are iterated in reverse order, but the
296    /// elements within them will appear in their original order.
297    pub fn data_with<F>(&self, mut f: F)
298    where
299        F: FnMut(&[T]),
300    {
301        let guard = &epoch_pin();
302        let backoff = Backoff::new();
303
304        // While we have a valid block -- either `tail` or the next block as we keep reading -- we
305        // load the data from each block and process it by calling `f`.
306        let mut block_ptr = self.tail.load(Ordering::Acquire, guard);
307        while !block_ptr.is_null() {
308            let block = unsafe { block_ptr.deref() };
309
310            // We wait for the block to be quiesced to ensure we get any in-flight writes, and
311            // snoozing specifically yields the reading thread to ensure things are given a
312            // chance to complete.
313            while !block.is_quiesced() {
314                backoff.snooze();
315            }
316
317            // Read the data out of the block.
318            let data = block.data();
319            f(data);
320
321            // Load the next block.
322            block_ptr = block.next.load(Ordering::Acquire, guard);
323        }
324    }
325
326    /// Clears the bucket.
327    ///
328    /// Deallocation of the internal blocks happens only when all readers have finished, and so
329    /// will not necessarily occur during or immediately preceding this method.
330    ///
331    /// # Note
332    /// This method will not affect reads that are already in progress.
333    pub fn clear(&self) {
334        self.clear_with(|_: &[T]| {})
335    }
336
337    /// Clears the bucket, invoking `f` for every block that will be cleared.
338    ///
339    /// Deallocation of the internal blocks happens only when all readers have finished, and so
340    /// will not necessarily occur during or immediately preceding this method.
341    ///
342    /// This method is useful for accumulating values and then observing them, in a way that allows
343    /// the caller to avoid visiting the same values again the next time.
344    ///
345    /// This method allows a pattern of observing values before they're cleared, with a clear
346    /// demarcation. A similar pattern used in the wild would be to have some data structure, like
347    /// a vector, which is continuously filled, and then eventually swapped out with a new, empty
348    /// vector, allowing the caller to read all of the old values while new values are being
349    /// written, over and over again.
350    ///
351    /// # Note
352    /// This method will not affect reads that are already in progress.
353    pub fn clear_with<F>(&self, mut f: F)
354    where
355        F: FnMut(&[T]),
356    {
357        // We simply swap the tail pointer which effectively clears the bucket.  Callers might
358        // still be in process of writing to the tail node, or reading the data, but new callers
359        // will see it as empty until another write proceeds.
360        let guard = &epoch_pin();
361        let mut block_ptr = self.tail.load(Ordering::Acquire, guard);
362        if !block_ptr.is_null()
363            && self
364                .tail
365                .compare_exchange(
366                    block_ptr,
367                    Shared::null(),
368                    Ordering::SeqCst,
369                    Ordering::SeqCst,
370                    guard,
371                )
372                .is_ok()
373        {
374            let backoff = Backoff::new();
375            let mut freeable_blocks = Vec::new();
376
377            // While we have a valid block -- either `tail` or the next block as we keep reading -- we
378            // load the data from each block and process it by calling `f`.
379            while !block_ptr.is_null() {
380                let block = unsafe { block_ptr.deref() };
381
382                // We wait for the block to be quiesced to ensure we get any in-flight writes, and
383                // snoozing specifically yields the reading thread to ensure things are given a
384                // chance to complete.
385                while !block.is_quiesced() {
386                    backoff.snooze();
387                }
388
389                // Read the data out of the block.
390                let data = block.data();
391                f(data);
392
393                // Load the next block and take the shared reference to the current.
394                let old_block_ptr =
395                    mem::replace(&mut block_ptr, block.next.load(Ordering::Acquire, guard));
396
397                freeable_blocks.push(old_block_ptr);
398                if freeable_blocks.len() >= DEFERRED_BLOCK_BATCH_SIZE {
399                    let blocks = mem::take(&mut freeable_blocks);
400                    unsafe {
401                        guard.defer_unchecked(move || {
402                            for block in blocks {
403                                drop(block.into_owned());
404                            }
405                        });
406                    }
407                }
408            }
409
410            // Free any remaining old blocks.
411            if !freeable_blocks.is_empty() {
412                unsafe {
413                    guard.defer_unchecked(move || {
414                        for block in freeable_blocks {
415                            drop(block.into_owned());
416                        }
417                    });
418                }
419            }
420
421            // This asks the global collector to attempt to drive execution of deferred operations a
422            // little sooner than it may have done so otherwise.
423            guard.flush();
424        }
425    }
426}
427
428impl<T> Default for AtomicBucket<T> {
429    fn default() -> Self {
430        Self { tail: Atomic::null() }
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::{AtomicBucket, Block, BLOCK_SIZE};
437    use crossbeam_utils::thread::scope;
438
439    #[test]
440    fn test_create_new_block() {
441        let block: Block<u64> = Block::new();
442        assert_eq!(block.len(), 0);
443
444        let data = block.data();
445        assert_eq!(data.len(), 0);
446    }
447
448    #[test]
449    fn test_block_write_then_read() {
450        let block = Block::new();
451        assert_eq!(block.len(), 0);
452
453        let data = block.data();
454        assert_eq!(data.len(), 0);
455
456        let result = block.push(42);
457        assert!(result.is_ok());
458        assert_eq!(block.len(), 1);
459
460        let data = block.data();
461        assert_eq!(data.len(), 1);
462        assert_eq!(data[0], 42);
463    }
464
465    #[test]
466    fn test_block_write_until_full_then_read() {
467        let block = Block::new();
468        assert_eq!(block.len(), 0);
469
470        let data = block.data();
471        assert_eq!(data.len(), 0);
472
473        let mut i = 0;
474        let mut total = 0;
475        while i < BLOCK_SIZE as u64 {
476            assert!(block.push(i).is_ok());
477
478            total += i;
479            i += 1;
480        }
481
482        let data = block.data();
483        assert_eq!(data.len(), BLOCK_SIZE);
484
485        let sum: u64 = data.iter().sum();
486        assert_eq!(sum, total);
487
488        let result = block.push(42);
489        assert!(result.is_err());
490    }
491
492    #[test]
493    fn test_block_write_until_full_then_read_mt() {
494        let block = Block::new();
495        assert_eq!(block.len(), 0);
496
497        let data = block.data();
498        assert_eq!(data.len(), 0);
499
500        let res = scope(|s| {
501            let t1 = s.spawn(|_| {
502                let mut i = 0;
503                let mut total = 0;
504                while i < BLOCK_SIZE as u64 / 2 {
505                    assert!(block.push(i).is_ok());
506
507                    total += i;
508                    i += 1;
509                }
510                total
511            });
512
513            let t2 = s.spawn(|_| {
514                let mut i = 0;
515                let mut total = 0;
516                while i < BLOCK_SIZE as u64 / 2 {
517                    assert!(block.push(i).is_ok());
518
519                    total += i;
520                    i += 1;
521                }
522                total
523            });
524
525            let t1_total = t1.join().unwrap();
526            let t2_total = t2.join().unwrap();
527
528            t1_total + t2_total
529        });
530
531        let total = res.unwrap();
532
533        let data = block.data();
534        assert_eq!(data.len(), BLOCK_SIZE);
535
536        let sum: u64 = data.iter().sum();
537        assert_eq!(sum, total);
538
539        let result = block.push(42);
540        assert!(result.is_err());
541    }
542
543    #[test]
544    fn test_bucket_write_then_read() {
545        let bucket = AtomicBucket::new();
546        bucket.push(42);
547
548        let snapshot = bucket.data();
549        assert_eq!(snapshot.len(), 1);
550        assert_eq!(snapshot[0], 42);
551    }
552
553    #[test]
554    fn test_bucket_multiple_blocks_write_then_read() {
555        let bucket = AtomicBucket::new();
556
557        let snapshot = bucket.data();
558        assert_eq!(snapshot.len(), 0);
559
560        let target = (BLOCK_SIZE * 3 + BLOCK_SIZE / 2) as u64;
561        let mut i = 0;
562        let mut total = 0;
563        while i < target {
564            bucket.push(i);
565
566            total += i;
567            i += 1;
568        }
569
570        let snapshot = bucket.data();
571        assert_eq!(snapshot.len(), target as usize);
572
573        let sum: u64 = snapshot.iter().sum();
574        assert_eq!(sum, total);
575    }
576
577    #[test]
578    fn test_bucket_write_then_read_mt() {
579        let bucket = AtomicBucket::new();
580
581        let snapshot = bucket.data();
582        assert_eq!(snapshot.len(), 0);
583
584        let res = scope(|s| {
585            let t1 = s.spawn(|_| {
586                let mut i = 0;
587                let mut total = 0;
588                while i < BLOCK_SIZE as u64 * 100_000 {
589                    bucket.push(i);
590
591                    total += i;
592                    i += 1;
593                }
594                total
595            });
596
597            let t2 = s.spawn(|_| {
598                let mut i = 0;
599                let mut total = 0;
600                while i < BLOCK_SIZE as u64 * 100_000 {
601                    bucket.push(i);
602
603                    total += i;
604                    i += 1;
605                }
606                total
607            });
608
609            let t1_total = t1.join().unwrap();
610            let t2_total = t2.join().unwrap();
611
612            t1_total + t2_total
613        });
614
615        let total = res.unwrap();
616
617        let snapshot = bucket.data();
618        assert_eq!(snapshot.len(), BLOCK_SIZE * 200_000);
619
620        let sum = snapshot.iter().sum::<u64>();
621        assert_eq!(sum, total);
622    }
623
624    #[test]
625    fn test_clear_and_clear_with() {
626        let bucket = AtomicBucket::new();
627
628        let snapshot = bucket.data();
629        assert_eq!(snapshot.len(), 0);
630
631        let mut i = 0;
632        let mut total_pushed = 0;
633        while i < BLOCK_SIZE * 4 {
634            bucket.push(i);
635
636            total_pushed += i;
637            i += 1;
638        }
639
640        let snapshot = bucket.data();
641        assert_eq!(snapshot.len(), i);
642
643        let mut total_accumulated = 0;
644        bucket.clear_with(|xs| total_accumulated += xs.iter().sum::<usize>());
645        assert_eq!(total_pushed, total_accumulated);
646
647        let snapshot = bucket.data();
648        assert_eq!(snapshot.len(), 0);
649    }
650
651    #[test]
652    fn test_bucket_len_and_next_len() {
653        let bucket = AtomicBucket::new();
654        assert!(bucket.is_empty());
655
656        let snapshot = bucket.data();
657        assert_eq!(snapshot.len(), 0);
658
659        // Just making sure that `is_empty` holds as we go from
660        // the first block, to the second block, to exercise the
661        // `Block::next_len` codepath.
662        let mut i = 0;
663        while i < BLOCK_SIZE * 2 {
664            bucket.push(i);
665            assert!(!bucket.is_empty());
666            i += 1;
667        }
668    }
669}