fuel_core_services/
seqlock.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
//! A simple implementation of a sequential lock.
//! More details: <https://docs.kernel.org/locking/seqlock.html>

use std::{
    cell::UnsafeCell,
    panic::UnwindSafe,
    sync::atomic::{
        fence,
        AtomicU64,
        Ordering,
    },
};

/// A simple implementation of a sequential lock.
/// some usage of unsafe, T must be Copy
#[derive(Debug)]
pub struct SeqLock<T: Copy> {
    sequence: AtomicU64,
    data: UnsafeCell<T>,
}

unsafe impl<T: Send + Copy> Sync for SeqLock<T> {}

/// The writer handle for the `SeqLock`.
/// Only one writer exists for a `SeqLock`.
/// There is no Clone bound since we want to enforce only one writer.
#[derive(Debug)]
pub struct SeqLockWriter<T: Copy> {
    lock: std::sync::Arc<SeqLock<T>>,
}

impl<T: Copy> SeqLockWriter<T> {
    /// Modifies the data within the lock.
    pub fn write<F>(&self, f: F)
    where
        F: FnOnce(&mut T) + UnwindSafe,
    {
        let lock = &self.lock;

        // Indicate that a write operation is starting.
        lock.sequence.fetch_add(1, Ordering::AcqRel);
        // reordering safety
        fence(Ordering::Acquire);

        // attempt to perform the write, and catch any panics
        // we won't have partial write problems since data <= 64 bytes
        // safety: panics are caught and resumed
        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| unsafe {
            let data = &mut *lock.data.get();
            f(data);
        }));

        // reordering safety
        fence(Ordering::Release);
        // Indicate that the write operation has finished.
        lock.sequence.fetch_add(1, Ordering::Release);

        // resume unwinding if there was an error
        if let Err(e) = result {
            std::panic::resume_unwind(e);
        }
    }
}

/// The reader handle for the `SeqLock`.
/// Multiple readers can be created for a `SeqLock`.
#[derive(Clone, Debug)]
pub struct SeqLockReader<T: Copy> {
    lock: std::sync::Arc<SeqLock<T>>,
}

impl<T: Copy> SeqLockReader<T> {
    /// Reads the data within the lock.
    pub fn read(&self) -> T {
        let lock = &self.lock;

        loop {
            // check starting guard
            let start = lock.sequence.load(Ordering::Acquire);

            // if odd, write in progress
            if start % 2 != 0 {
                std::thread::yield_now();
                continue;
            }

            // reordering safety
            fence(Ordering::Acquire);

            // safety: when the data <=64 bytes, it fits in a single cache line
            // and cannot be subject to torn reads
            let data = unsafe { *lock.data.get() };

            // reordering safety
            fence(Ordering::Acquire);

            // check starting/ending guard
            let end = lock.sequence.load(Ordering::Acquire);

            // if value changed, retry
            if start == end && start % 2 == 0 {
                return data;
            }
        }
    }
}

impl<T: Copy> SeqLock<T> {
    /// Creates a new `SeqLock` and returns a writer and a reader handle.
    /// Optimized for occasional writes and frequent reads
    ///  !!WARNING!!
    /// ONLY USE IF ALL THE BELOW CRITERIA ARE MET
    ///  1. Internal data <= 64 bytes
    ///  2. VERY frequent reads
    /// # Safety
    /// The data must be `Copy`
    #[allow(clippy::new_ret_no_self)]
    pub unsafe fn new(data: T) -> (SeqLockWriter<T>, SeqLockReader<T>) {
        let lock = Self {
            sequence: AtomicU64::new(0),
            data: UnsafeCell::new(data),
        };
        let shared = std::sync::Arc::new(lock);
        (
            SeqLockWriter {
                lock: std::sync::Arc::clone(&shared),
            },
            SeqLockReader { lock: shared },
        )
    }
}

#[allow(non_snake_case)]
#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;

    #[test]
    fn test_seqlock__provides_correct_values_in_order() {
        let (writer, reader) = unsafe { SeqLock::new(42) };
        let iterations = 100;

        let writer = {
            thread::spawn(move || {
                for i in 0..iterations {
                    writer.write(|data| *data = i);
                }
            })
        };

        let reader = {
            let lock = reader.clone();
            thread::spawn(move || {
                let seen = 0;

                for _ in 0..iterations {
                    let value = lock.read();
                    assert!(value >= seen);
                }
            })
        };

        writer.join().unwrap();
        reader.join().unwrap();
    }

    #[test]
    fn test_seqlock__single_threaded() {
        let (writer, reader) = unsafe { SeqLock::new(42) };

        writer.write(|data| {
            *data = 100;
        });

        let value = reader.read();
        assert_eq!(value, 100);
    }
}