async_mutex/lib.rs
1//! An async mutex.
2//!
3//! The locking mechanism uses eventual fairness to ensure locking will be fair on average without
4//! sacrificing performance. This is done by forcing a fair lock whenever a lock operation is
5//! starved for longer than 0.5 milliseconds.
6//!
7//! # Examples
8//!
9//! ```
10//! # futures_lite::future::block_on(async {
11//! use async_mutex::Mutex;
12//!
13//! let m = Mutex::new(1);
14//!
15//! let mut guard = m.lock().await;
16//! *guard = 2;
17//!
18//! assert!(m.try_lock().is_none());
19//! drop(guard);
20//! assert_eq!(*m.try_lock().unwrap(), 2);
21//! # })
22//! ```
23
24#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
25
26use std::cell::UnsafeCell;
27use std::fmt;
28use std::ops::{Deref, DerefMut};
29use std::process;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33use std::usize;
34
35use event_listener::Event;
36
37/// An async mutex.
38pub struct Mutex<T: ?Sized> {
39 /// Current state of the mutex.
40 ///
41 /// The least significant bit is set to 1 if the mutex is locked.
42 /// The other bits hold the number of starved lock operations.
43 state: AtomicUsize,
44
45 /// Lock operations waiting for the mutex to be released.
46 lock_ops: Event,
47
48 /// The value inside the mutex.
49 data: UnsafeCell<T>,
50}
51
52unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
53unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
54
55impl<T> Mutex<T> {
56 /// Creates a new async mutex.
57 ///
58 /// # Examples
59 ///
60 /// ```
61 /// use async_mutex::Mutex;
62 ///
63 /// let mutex = Mutex::new(0);
64 /// ```
65 pub const fn new(data: T) -> Mutex<T> {
66 Mutex {
67 state: AtomicUsize::new(0),
68 lock_ops: Event::new(),
69 data: UnsafeCell::new(data),
70 }
71 }
72
73 /// Consumes the mutex, returning the underlying data.
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// use async_mutex::Mutex;
79 ///
80 /// let mutex = Mutex::new(10);
81 /// assert_eq!(mutex.into_inner(), 10);
82 /// ```
83 pub fn into_inner(self) -> T {
84 self.data.into_inner()
85 }
86}
87
88impl<T: ?Sized> Mutex<T> {
89 /// Acquires the mutex.
90 ///
91 /// Returns a guard that releases the mutex when dropped.
92 ///
93 /// # Examples
94 ///
95 /// ```
96 /// # futures_lite::future::block_on(async {
97 /// use async_mutex::Mutex;
98 ///
99 /// let mutex = Mutex::new(10);
100 /// let guard = mutex.lock().await;
101 /// assert_eq!(*guard, 10);
102 /// # })
103 /// ```
104 #[inline]
105 pub async fn lock(&self) -> MutexGuard<'_, T> {
106 if let Some(guard) = self.try_lock() {
107 return guard;
108 }
109 self.acquire_slow().await;
110 MutexGuard(self)
111 }
112
113 /// Slow path for acquiring the mutex.
114 #[cold]
115 async fn acquire_slow(&self) {
116 // Get the current time.
117 let start = Instant::now();
118
119 loop {
120 // Start listening for events.
121 let listener = self.lock_ops.listen();
122
123 // Try locking if nobody is being starved.
124 match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
125 // Lock acquired!
126 0 => return,
127
128 // Lock is held and nobody is starved.
129 1 => {}
130
131 // Somebody is starved.
132 _ => break,
133 }
134
135 // Wait for a notification.
136 listener.await;
137
138 // Try locking if nobody is being starved.
139 match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
140 // Lock acquired!
141 0 => return,
142
143 // Lock is held and nobody is starved.
144 1 => {}
145
146 // Somebody is starved.
147 _ => {
148 // Notify the first listener in line because we probably received a
149 // notification that was meant for a starved task.
150 self.lock_ops.notify(1);
151 break;
152 }
153 }
154
155 // If waiting for too long, fall back to a fairer locking strategy that will prevent
156 // newer lock operations from starving us forever.
157 if start.elapsed() > Duration::from_micros(500) {
158 break;
159 }
160 }
161
162 // Increment the number of starved lock operations.
163 if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
164 // In case of potential overflow, abort.
165 process::abort();
166 }
167
168 // Decrement the counter when exiting this function.
169 let _call = CallOnDrop(|| {
170 self.state.fetch_sub(2, Ordering::Release);
171 });
172
173 loop {
174 // Start listening for events.
175 let listener = self.lock_ops.listen();
176
177 // Try locking if nobody else is being starved.
178 match self.state.compare_and_swap(2, 2 | 1, Ordering::Acquire) {
179 // Lock acquired!
180 2 => return,
181
182 // Lock is held by someone.
183 s if s % 2 == 1 => {}
184
185 // Lock is available.
186 _ => {
187 // Be fair: notify the first listener and then go wait in line.
188 self.lock_ops.notify(1);
189 }
190 }
191
192 // Wait for a notification.
193 listener.await;
194
195 // Try acquiring the lock without waiting for others.
196 if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
197 return;
198 }
199 }
200 }
201
202 /// Attempts to acquire the mutex.
203 ///
204 /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
205 /// guard is returned that releases the mutex when dropped.
206 ///
207 /// # Examples
208 ///
209 /// ```
210 /// use async_mutex::Mutex;
211 ///
212 /// let mutex = Mutex::new(10);
213 /// if let Some(guard) = mutex.try_lock() {
214 /// assert_eq!(*guard, 10);
215 /// }
216 /// # ;
217 /// ```
218 #[inline]
219 pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
220 if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
221 Some(MutexGuard(self))
222 } else {
223 None
224 }
225 }
226
227 /// Returns a mutable reference to the underlying data.
228 ///
229 /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
230 /// borrow statically guarantees the mutex is not already acquired.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// # futures_lite::future::block_on(async {
236 /// use async_mutex::Mutex;
237 ///
238 /// let mut mutex = Mutex::new(0);
239 /// *mutex.get_mut() = 10;
240 /// assert_eq!(*mutex.lock().await, 10);
241 /// # })
242 /// ```
243 pub fn get_mut(&mut self) -> &mut T {
244 unsafe { &mut *self.data.get() }
245 }
246}
247
248impl<T: ?Sized> Mutex<T> {
249 /// Acquires the mutex and clones a reference to it.
250 ///
251 /// Returns an owned guard that releases the mutex when dropped.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// # futures_lite::future::block_on(async {
257 /// use async_mutex::Mutex;
258 /// use std::sync::Arc;
259 ///
260 /// let mutex = Arc::new(Mutex::new(10));
261 /// let guard = mutex.lock_arc().await;
262 /// assert_eq!(*guard, 10);
263 /// # })
264 /// ```
265 #[inline]
266 pub async fn lock_arc(self: &Arc<Self>) -> MutexGuardArc<T> {
267 if let Some(guard) = self.try_lock_arc() {
268 return guard;
269 }
270 self.acquire_slow().await;
271 MutexGuardArc(self.clone())
272 }
273
274 /// Attempts to acquire the mutex and clone a reference to it.
275 ///
276 /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
277 /// owned guard is returned that releases the mutex when dropped.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// use async_mutex::Mutex;
283 /// use std::sync::Arc;
284 ///
285 /// let mutex = Arc::new(Mutex::new(10));
286 /// if let Some(guard) = mutex.try_lock() {
287 /// assert_eq!(*guard, 10);
288 /// }
289 /// # ;
290 /// ```
291 #[inline]
292 pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
293 if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
294 Some(MutexGuardArc(self.clone()))
295 } else {
296 None
297 }
298 }
299}
300
301impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
302 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303 struct Locked;
304 impl fmt::Debug for Locked {
305 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306 f.write_str("<locked>")
307 }
308 }
309
310 match self.try_lock() {
311 None => f.debug_struct("Mutex").field("data", &Locked).finish(),
312 Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
313 }
314 }
315}
316
317impl<T> From<T> for Mutex<T> {
318 fn from(val: T) -> Mutex<T> {
319 Mutex::new(val)
320 }
321}
322
323impl<T: Default + ?Sized> Default for Mutex<T> {
324 fn default() -> Mutex<T> {
325 Mutex::new(Default::default())
326 }
327}
328
329/// A guard that releases the mutex when dropped.
330pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
331
332unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
333unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
334
335impl<'a, T: ?Sized> MutexGuard<'a, T> {
336 /// Returns a reference to the mutex a guard came from.
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// # futures_lite::future::block_on(async {
342 /// use async_mutex::{Mutex, MutexGuard};
343 ///
344 /// let mutex = Mutex::new(10i32);
345 /// let guard = mutex.lock().await;
346 /// dbg!(MutexGuard::source(&guard));
347 /// # })
348 /// ```
349 pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
350 guard.0
351 }
352}
353
354impl<T: ?Sized> Drop for MutexGuard<'_, T> {
355 fn drop(&mut self) {
356 // Remove the last bit and notify a waiting lock operation.
357 self.0.state.fetch_sub(1, Ordering::Release);
358 self.0.lock_ops.notify(1);
359 }
360}
361
362impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
363 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364 fmt::Debug::fmt(&**self, f)
365 }
366}
367
368impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 (**self).fmt(f)
371 }
372}
373
374impl<T: ?Sized> Deref for MutexGuard<'_, T> {
375 type Target = T;
376
377 fn deref(&self) -> &T {
378 unsafe { &*self.0.data.get() }
379 }
380}
381
382impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
383 fn deref_mut(&mut self) -> &mut T {
384 unsafe { &mut *self.0.data.get() }
385 }
386}
387
388/// An owned guard that releases the mutex when dropped.
389pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
390
391unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
392unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
393
394impl<T: ?Sized> MutexGuardArc<T> {
395 /// Returns a reference to the mutex a guard came from.
396 ///
397 /// # Examples
398 ///
399 /// ```
400 /// # futures_lite::future::block_on(async {
401 /// use async_mutex::{Mutex, MutexGuardArc};
402 /// use std::sync::Arc;
403 ///
404 /// let mutex = Arc::new(Mutex::new(10i32));
405 /// let guard = mutex.lock_arc().await;
406 /// dbg!(MutexGuardArc::source(&guard));
407 /// # })
408 /// ```
409 pub fn source(guard: &MutexGuardArc<T>) -> &Arc<Mutex<T>> {
410 &guard.0
411 }
412}
413
414impl<T: ?Sized> Drop for MutexGuardArc<T> {
415 fn drop(&mut self) {
416 // Remove the last bit and notify a waiting lock operation.
417 self.0.state.fetch_sub(1, Ordering::Release);
418 self.0.lock_ops.notify(1);
419 }
420}
421
422impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
423 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424 fmt::Debug::fmt(&**self, f)
425 }
426}
427
428impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
429 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430 (**self).fmt(f)
431 }
432}
433
434impl<T: ?Sized> Deref for MutexGuardArc<T> {
435 type Target = T;
436
437 fn deref(&self) -> &T {
438 unsafe { &*self.0.data.get() }
439 }
440}
441
442impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
443 fn deref_mut(&mut self) -> &mut T {
444 unsafe { &mut *self.0.data.get() }
445 }
446}
447
448/// Calls a function when dropped.
449struct CallOnDrop<F: Fn()>(F);
450
451impl<F: Fn()> Drop for CallOnDrop<F> {
452 fn drop(&mut self) {
453 (self.0)();
454 }
455}