async_mutex/
lib.rs

1//! An async mutex.
2//!
3//! The locking mechanism uses eventual fairness to ensure locking will be fair on average without
4//! sacrificing performance. This is done by forcing a fair lock whenever a lock operation is
5//! starved for longer than 0.5 milliseconds.
6//!
7//! # Examples
8//!
9//! ```
10//! # futures_lite::future::block_on(async {
11//! use async_mutex::Mutex;
12//!
13//! let m = Mutex::new(1);
14//!
15//! let mut guard = m.lock().await;
16//! *guard = 2;
17//!
18//! assert!(m.try_lock().is_none());
19//! drop(guard);
20//! assert_eq!(*m.try_lock().unwrap(), 2);
21//! # })
22//! ```
23
24#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
25
26use std::cell::UnsafeCell;
27use std::fmt;
28use std::ops::{Deref, DerefMut};
29use std::process;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33use std::usize;
34
35use event_listener::Event;
36
37/// An async mutex.
38pub struct Mutex<T: ?Sized> {
39    /// Current state of the mutex.
40    ///
41    /// The least significant bit is set to 1 if the mutex is locked.
42    /// The other bits hold the number of starved lock operations.
43    state: AtomicUsize,
44
45    /// Lock operations waiting for the mutex to be released.
46    lock_ops: Event,
47
48    /// The value inside the mutex.
49    data: UnsafeCell<T>,
50}
51
52unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
53unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
54
55impl<T> Mutex<T> {
56    /// Creates a new async mutex.
57    ///
58    /// # Examples
59    ///
60    /// ```
61    /// use async_mutex::Mutex;
62    ///
63    /// let mutex = Mutex::new(0);
64    /// ```
65    pub const fn new(data: T) -> Mutex<T> {
66        Mutex {
67            state: AtomicUsize::new(0),
68            lock_ops: Event::new(),
69            data: UnsafeCell::new(data),
70        }
71    }
72
73    /// Consumes the mutex, returning the underlying data.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use async_mutex::Mutex;
79    ///
80    /// let mutex = Mutex::new(10);
81    /// assert_eq!(mutex.into_inner(), 10);
82    /// ```
83    pub fn into_inner(self) -> T {
84        self.data.into_inner()
85    }
86}
87
88impl<T: ?Sized> Mutex<T> {
89    /// Acquires the mutex.
90    ///
91    /// Returns a guard that releases the mutex when dropped.
92    ///
93    /// # Examples
94    ///
95    /// ```
96    /// # futures_lite::future::block_on(async {
97    /// use async_mutex::Mutex;
98    ///
99    /// let mutex = Mutex::new(10);
100    /// let guard = mutex.lock().await;
101    /// assert_eq!(*guard, 10);
102    /// # })
103    /// ```
104    #[inline]
105    pub async fn lock(&self) -> MutexGuard<'_, T> {
106        if let Some(guard) = self.try_lock() {
107            return guard;
108        }
109        self.acquire_slow().await;
110        MutexGuard(self)
111    }
112
113    /// Slow path for acquiring the mutex.
114    #[cold]
115    async fn acquire_slow(&self) {
116        // Get the current time.
117        let start = Instant::now();
118
119        loop {
120            // Start listening for events.
121            let listener = self.lock_ops.listen();
122
123            // Try locking if nobody is being starved.
124            match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
125                // Lock acquired!
126                0 => return,
127
128                // Lock is held and nobody is starved.
129                1 => {}
130
131                // Somebody is starved.
132                _ => break,
133            }
134
135            // Wait for a notification.
136            listener.await;
137
138            // Try locking if nobody is being starved.
139            match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
140                // Lock acquired!
141                0 => return,
142
143                // Lock is held and nobody is starved.
144                1 => {}
145
146                // Somebody is starved.
147                _ => {
148                    // Notify the first listener in line because we probably received a
149                    // notification that was meant for a starved task.
150                    self.lock_ops.notify(1);
151                    break;
152                }
153            }
154
155            // If waiting for too long, fall back to a fairer locking strategy that will prevent
156            // newer lock operations from starving us forever.
157            if start.elapsed() > Duration::from_micros(500) {
158                break;
159            }
160        }
161
162        // Increment the number of starved lock operations.
163        if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
164            // In case of potential overflow, abort.
165            process::abort();
166        }
167
168        // Decrement the counter when exiting this function.
169        let _call = CallOnDrop(|| {
170            self.state.fetch_sub(2, Ordering::Release);
171        });
172
173        loop {
174            // Start listening for events.
175            let listener = self.lock_ops.listen();
176
177            // Try locking if nobody else is being starved.
178            match self.state.compare_and_swap(2, 2 | 1, Ordering::Acquire) {
179                // Lock acquired!
180                2 => return,
181
182                // Lock is held by someone.
183                s if s % 2 == 1 => {}
184
185                // Lock is available.
186                _ => {
187                    // Be fair: notify the first listener and then go wait in line.
188                    self.lock_ops.notify(1);
189                }
190            }
191
192            // Wait for a notification.
193            listener.await;
194
195            // Try acquiring the lock without waiting for others.
196            if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
197                return;
198            }
199        }
200    }
201
202    /// Attempts to acquire the mutex.
203    ///
204    /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
205    /// guard is returned that releases the mutex when dropped.
206    ///
207    /// # Examples
208    ///
209    /// ```
210    /// use async_mutex::Mutex;
211    ///
212    /// let mutex = Mutex::new(10);
213    /// if let Some(guard) = mutex.try_lock() {
214    ///     assert_eq!(*guard, 10);
215    /// }
216    /// # ;
217    /// ```
218    #[inline]
219    pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
220        if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
221            Some(MutexGuard(self))
222        } else {
223            None
224        }
225    }
226
227    /// Returns a mutable reference to the underlying data.
228    ///
229    /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
230    /// borrow statically guarantees the mutex is not already acquired.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// # futures_lite::future::block_on(async {
236    /// use async_mutex::Mutex;
237    ///
238    /// let mut mutex = Mutex::new(0);
239    /// *mutex.get_mut() = 10;
240    /// assert_eq!(*mutex.lock().await, 10);
241    /// # })
242    /// ```
243    pub fn get_mut(&mut self) -> &mut T {
244        unsafe { &mut *self.data.get() }
245    }
246}
247
248impl<T: ?Sized> Mutex<T> {
249    /// Acquires the mutex and clones a reference to it.
250    ///
251    /// Returns an owned guard that releases the mutex when dropped.
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// # futures_lite::future::block_on(async {
257    /// use async_mutex::Mutex;
258    /// use std::sync::Arc;
259    ///
260    /// let mutex = Arc::new(Mutex::new(10));
261    /// let guard = mutex.lock_arc().await;
262    /// assert_eq!(*guard, 10);
263    /// # })
264    /// ```
265    #[inline]
266    pub async fn lock_arc(self: &Arc<Self>) -> MutexGuardArc<T> {
267        if let Some(guard) = self.try_lock_arc() {
268            return guard;
269        }
270        self.acquire_slow().await;
271        MutexGuardArc(self.clone())
272    }
273
274    /// Attempts to acquire the mutex and clone a reference to it.
275    ///
276    /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
277    /// owned guard is returned that releases the mutex when dropped.
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// use async_mutex::Mutex;
283    /// use std::sync::Arc;
284    ///
285    /// let mutex = Arc::new(Mutex::new(10));
286    /// if let Some(guard) = mutex.try_lock() {
287    ///     assert_eq!(*guard, 10);
288    /// }
289    /// # ;
290    /// ```
291    #[inline]
292    pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
293        if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
294            Some(MutexGuardArc(self.clone()))
295        } else {
296            None
297        }
298    }
299}
300
301impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        struct Locked;
304        impl fmt::Debug for Locked {
305            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306                f.write_str("<locked>")
307            }
308        }
309
310        match self.try_lock() {
311            None => f.debug_struct("Mutex").field("data", &Locked).finish(),
312            Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
313        }
314    }
315}
316
317impl<T> From<T> for Mutex<T> {
318    fn from(val: T) -> Mutex<T> {
319        Mutex::new(val)
320    }
321}
322
323impl<T: Default + ?Sized> Default for Mutex<T> {
324    fn default() -> Mutex<T> {
325        Mutex::new(Default::default())
326    }
327}
328
329/// A guard that releases the mutex when dropped.
330pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
331
332unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
333unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
334
335impl<'a, T: ?Sized> MutexGuard<'a, T> {
336    /// Returns a reference to the mutex a guard came from.
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// # futures_lite::future::block_on(async {
342    /// use async_mutex::{Mutex, MutexGuard};
343    ///
344    /// let mutex = Mutex::new(10i32);
345    /// let guard = mutex.lock().await;
346    /// dbg!(MutexGuard::source(&guard));
347    /// # })
348    /// ```
349    pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
350        guard.0
351    }
352}
353
354impl<T: ?Sized> Drop for MutexGuard<'_, T> {
355    fn drop(&mut self) {
356        // Remove the last bit and notify a waiting lock operation.
357        self.0.state.fetch_sub(1, Ordering::Release);
358        self.0.lock_ops.notify(1);
359    }
360}
361
362impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
363    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364        fmt::Debug::fmt(&**self, f)
365    }
366}
367
368impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        (**self).fmt(f)
371    }
372}
373
374impl<T: ?Sized> Deref for MutexGuard<'_, T> {
375    type Target = T;
376
377    fn deref(&self) -> &T {
378        unsafe { &*self.0.data.get() }
379    }
380}
381
382impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
383    fn deref_mut(&mut self) -> &mut T {
384        unsafe { &mut *self.0.data.get() }
385    }
386}
387
388/// An owned guard that releases the mutex when dropped.
389pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
390
391unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
392unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
393
394impl<T: ?Sized> MutexGuardArc<T> {
395    /// Returns a reference to the mutex a guard came from.
396    ///
397    /// # Examples
398    ///
399    /// ```
400    /// # futures_lite::future::block_on(async {
401    /// use async_mutex::{Mutex, MutexGuardArc};
402    /// use std::sync::Arc;
403    ///
404    /// let mutex = Arc::new(Mutex::new(10i32));
405    /// let guard = mutex.lock_arc().await;
406    /// dbg!(MutexGuardArc::source(&guard));
407    /// # })
408    /// ```
409    pub fn source(guard: &MutexGuardArc<T>) -> &Arc<Mutex<T>> {
410        &guard.0
411    }
412}
413
414impl<T: ?Sized> Drop for MutexGuardArc<T> {
415    fn drop(&mut self) {
416        // Remove the last bit and notify a waiting lock operation.
417        self.0.state.fetch_sub(1, Ordering::Release);
418        self.0.lock_ops.notify(1);
419    }
420}
421
422impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
423    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424        fmt::Debug::fmt(&**self, f)
425    }
426}
427
428impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
429    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430        (**self).fmt(f)
431    }
432}
433
434impl<T: ?Sized> Deref for MutexGuardArc<T> {
435    type Target = T;
436
437    fn deref(&self) -> &T {
438        unsafe { &*self.0.data.get() }
439    }
440}
441
442impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
443    fn deref_mut(&mut self) -> &mut T {
444        unsafe { &mut *self.0.data.get() }
445    }
446}
447
448/// Calls a function when dropped.
449struct CallOnDrop<F: Fn()>(F);
450
451impl<F: Fn()> Drop for CallOnDrop<F> {
452    fn drop(&mut self) {
453        (self.0)();
454    }
455}