broker_tokio/sync/mutex.rs
1//! An asynchronous `Mutex`-like type.
2//!
3//! This module provides [`Mutex`], a type that acts similarly to an asynchronous `Mutex`, with one
4//! major difference: the [`MutexGuard`] returned by `lock` is not tied to the lifetime of the
5//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
6//! release it at some later point in time.
7//!
8//! This allows you to do something along the lines of:
9//!
10//! ```rust,no_run
11//! use tokio::sync::Mutex;
12//! use std::sync::Arc;
13//!
14//! #[tokio::main]
15//! async fn main() {
16//! let data1 = Arc::new(Mutex::new(0));
17//! let data2 = Arc::clone(&data1);
18//!
19//! tokio::spawn(async move {
20//! let mut lock = data2.lock().await;
21//! *lock += 1;
22//! });
23//!
24//! let mut lock = data1.lock().await;
25//! *lock += 1;
26//! }
27//! ```
28//!
29//! Another example
30//! ```rust,no_run
31//! #![warn(rust_2018_idioms)]
32//!
33//! use tokio::sync::Mutex;
34//! use std::sync::Arc;
35//!
36//!
37//! #[tokio::main]
38//! async fn main() {
39//! let count = Arc::new(Mutex::new(0));
40//!
41//! for _ in 0..5 {
42//! let my_count = Arc::clone(&count);
43//! tokio::spawn(async move {
44//! for _ in 0..10 {
45//! let mut lock = my_count.lock().await;
46//! *lock += 1;
47//! println!("{}", lock);
48//! }
49//! });
50//! }
51//!
52//! loop {
53//! if *count.lock().await >= 50 {
54//! break;
55//! }
56//! }
57//! println!("Count hit 50.");
58//! }
59//! ```
60//! There are a few things of note here to pay attention to in this example.
61//! 1. The mutex is wrapped in an [`std::sync::Arc`] to allow it to be shared across threads.
62//! 2. Each spawned task obtains a lock and releases it on every iteration.
63//! 3. Mutation of the data the Mutex is protecting is done by de-referencing the the obtained lock
64//! as seen on lines 23 and 30.
65//!
66//! Tokio's Mutex works in a simple FIFO (first in, first out) style where as requests for a lock are
67//! made Tokio will queue them up and provide a lock when it is that requester's turn. In that way
68//! the Mutex is "fair" and predictable in how it distributes the locks to inner data. This is why
69//! the output of this program is an in-order count to 50. Locks are released and reacquired
70//! after every iteration, so basically, each thread goes to the back of the line after it increments
71//! the value once. Also, since there is only a single valid lock at any given time there is no
72//! possibility of a race condition when mutating the inner value.
73//!
74//! Note that in contrast to `std::sync::Mutex`, this implementation does not
75//! poison the mutex when a thread holding the `MutexGuard` panics. In such a
76//! case, the mutex will be unlocked. If the panic is caught, this might leave
77//! the data protected by the mutex in an inconsistent state.
78//!
79//! [`Mutex`]: struct.Mutex.html
80//! [`MutexGuard`]: struct.MutexGuard.html
81
82use crate::future::poll_fn;
83use crate::sync::semaphore_ll as semaphore;
84
85use std::cell::UnsafeCell;
86use std::error::Error;
87use std::fmt;
88use std::ops::{Deref, DerefMut};
89
90/// An asynchronous mutual exclusion primitive useful for protecting shared data
91///
92/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
93/// can only be accessed through the RAII guards returned from `lock`, which
94/// guarantees that the data is only ever accessed when the mutex is locked.
95#[derive(Debug)]
96pub struct Mutex<T> {
97 c: UnsafeCell<T>,
98 s: semaphore::Semaphore,
99}
100
101/// A handle to a held `Mutex`.
102///
103/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
104/// internally keeps a reference-couned pointer to the original `Mutex`, so even if the lock goes
105/// away, the guard remains valid.
106///
107/// The lock is automatically released whenever the guard is dropped, at which point `lock`
108/// will succeed yet again.
109pub struct MutexGuard<'a, T> {
110 lock: &'a Mutex<T>,
111 permit: semaphore::Permit,
112}
113
114// As long as T: Send, it's fine to send and share Mutex<T> between threads.
115// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can access T through
116// Mutex<T>.
117unsafe impl<T> Send for Mutex<T> where T: Send {}
118unsafe impl<T> Sync for Mutex<T> where T: Send {}
119unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
120
121/// Error returned from the [`Mutex::try_lock`] function.
122///
123/// A `try_lock` operation can only fail if the mutex is already locked.
124///
125/// [`Mutex::try_lock`]: Mutex::try_lock
126#[derive(Debug)]
127pub struct TryLockError(());
128
129impl fmt::Display for TryLockError {
130 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
131 write!(fmt, "{}", "operation would block")
132 }
133}
134
135impl Error for TryLockError {}
136
137#[test]
138#[cfg(not(loom))]
139fn bounds() {
140 fn check<T: Send>() {}
141 check::<MutexGuard<'_, u32>>();
142}
143
144impl<T> Mutex<T> {
145 /// Creates a new lock in an unlocked state ready for use.
146 pub fn new(t: T) -> Self {
147 Self {
148 c: UnsafeCell::new(t),
149 s: semaphore::Semaphore::new(1),
150 }
151 }
152
153 /// A future that resolves on acquiring the lock and returns the `MutexGuard`.
154 pub async fn lock(&self) -> MutexGuard<'_, T> {
155 let mut guard = MutexGuard {
156 lock: self,
157 permit: semaphore::Permit::new(),
158 };
159 poll_fn(|cx| guard.permit.poll_acquire(cx, 1, &self.s))
160 .await
161 .unwrap_or_else(|_| {
162 // The semaphore was closed. but, we never explicitly close it, and we have a
163 // handle to it through the Arc, which means that this can never happen.
164 unreachable!()
165 });
166 guard
167 }
168
169 /// Try to acquire the lock
170 pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
171 let mut permit = semaphore::Permit::new();
172 match permit.try_acquire(1, &self.s) {
173 Ok(_) => Ok(MutexGuard { lock: self, permit }),
174 Err(_) => Err(TryLockError(())),
175 }
176 }
177}
178
179impl<'a, T> Drop for MutexGuard<'a, T> {
180 fn drop(&mut self) {
181 self.permit.release(1, &self.lock.s);
182 }
183}
184
185impl<T> From<T> for Mutex<T> {
186 fn from(s: T) -> Self {
187 Self::new(s)
188 }
189}
190
191impl<T> Default for Mutex<T>
192where
193 T: Default,
194{
195 fn default() -> Self {
196 Self::new(T::default())
197 }
198}
199
200impl<'a, T> Deref for MutexGuard<'a, T> {
201 type Target = T;
202 fn deref(&self) -> &Self::Target {
203 assert!(self.permit.is_acquired());
204 unsafe { &*self.lock.c.get() }
205 }
206}
207
208impl<'a, T> DerefMut for MutexGuard<'a, T> {
209 fn deref_mut(&mut self) -> &mut Self::Target {
210 assert!(self.permit.is_acquired());
211 unsafe { &mut *self.lock.c.get() }
212 }
213}
214
215impl<'a, T: fmt::Debug> fmt::Debug for MutexGuard<'a, T> {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 fmt::Debug::fmt(&**self, f)
218 }
219}
220
221impl<'a, T: fmt::Display> fmt::Display for MutexGuard<'a, T> {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 fmt::Display::fmt(&**self, f)
224 }
225}