nu_protocol/engine/
sequence.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use crate::ShellError;
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};

/// Implements an atomically incrementing sequential series of numbers
#[derive(Debug, Default)]
pub struct Sequence(AtomicUsize);

impl Sequence {
    /// Return the next available id from a sequence, returning an error on overflow
    #[track_caller]
    pub fn next(&self) -> Result<usize, ShellError> {
        // It's totally safe to use Relaxed ordering here, as there aren't other memory operations
        // that depend on this value having been set for safety
        //
        // We're only not using `fetch_add` so that we can check for overflow, as wrapping with the
        // identifier would lead to a serious bug - however unlikely that is.
        self.0
            .fetch_update(Relaxed, Relaxed, |current| current.checked_add(1))
            .map_err(|_| ShellError::NushellFailedHelp {
                msg: "an accumulator for identifiers overflowed".into(),
                help: format!("see {}", std::panic::Location::caller()),
            })
    }
}

#[test]
fn output_is_sequential() {
    let sequence = Sequence::default();

    for (expected, generated) in (0..1000).zip(std::iter::repeat_with(|| sequence.next())) {
        assert_eq!(expected, generated.expect("error in sequence"));
    }
}

#[test]
fn output_is_unique_even_under_contention() {
    let sequence = Sequence::default();

    std::thread::scope(|scope| {
        // Spawn four threads, all advancing the sequence simultaneously
        let threads = (0..4)
            .map(|_| {
                scope.spawn(|| {
                    (0..100000)
                        .map(|_| sequence.next())
                        .collect::<Result<Vec<_>, _>>()
                })
            })
            .collect::<Vec<_>>();

        // Collect all of the results into a single flat vec
        let mut results = threads
            .into_iter()
            .flat_map(|thread| thread.join().expect("panicked").expect("error"))
            .collect::<Vec<usize>>();

        // Check uniqueness
        results.sort();
        let initial_length = results.len();
        results.dedup();
        let deduplicated_length = results.len();
        assert_eq!(initial_length, deduplicated_length);
    })
}