broker_tokio/sync/
rwlock.rs

1use crate::future::poll_fn;
2use crate::sync::semaphore_ll::{AcquireError, Permit, Semaphore};
3use std::cell::UnsafeCell;
4use std::ops;
5use std::task::{Context, Poll};
6
7#[cfg(not(loom))]
8const MAX_READS: usize = 32;
9
10#[cfg(loom)]
11const MAX_READS: usize = 10;
12
13/// An asynchronous reader-writer lock
14///
15/// This type of lock allows a number of readers or at most one writer at any
16/// point in time. The write portion of this lock typically allows modification
17/// of the underlying data (exclusive access) and the read portion of this lock
18/// typically allows for read-only access (shared access).
19///
20/// In comparison, a [`Mutex`] does not distinguish between readers or writers
21/// that acquire the lock, therefore blocking any tasks waiting for the lock to
22/// become available. An `RwLock` will allow any number of readers to acquire the
23/// lock as long as a writer is not holding the lock.
24///
25/// The priority policy of the lock is dependent on the underlying operating
26/// system's implementation, and this type does not guarantee that any
27/// particular policy will be used.
28///
29/// The type parameter `T` represents the data that this lock protects. It is
30/// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
31/// returned from the locking methods implement [`Deref`](https://doc.rust-lang.org/std/ops/trait.Deref.html)
32/// (and [`DerefMut`](https://doc.rust-lang.org/std/ops/trait.DerefMut.html)
33/// for the `write` methods) to allow access to the content of the lock.
34///
35/// # Examples
36///
37/// ```
38/// use tokio::sync::RwLock;
39///
40/// #[tokio::main]
41/// async fn main() {
42///     let lock = RwLock::new(5);
43///
44/// // many reader locks can be held at once
45///     {
46///         let r1 = lock.read().await;
47///         let r2 = lock.read().await;
48///         assert_eq!(*r1, 5);
49///         assert_eq!(*r2, 5);
50///     } // read locks are dropped at this point
51///
52/// // only one write lock may be held, however
53///     {
54///         let mut w = lock.write().await;
55///         *w += 1;
56///         assert_eq!(*w, 6);
57///     } // write lock is dropped here
58/// }
59/// ```
60///
61/// [`Mutex`]: struct.Mutex.html
62/// [`RwLock`]: struct.RwLock.html
63/// [`RwLockReadGuard`]: struct.RwLockReadGuard.html
64/// [`RwLockWriteGuard`]: struct.RwLockWriteGuard.html
65/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
66#[derive(Debug)]
67pub struct RwLock<T> {
68    //semaphore to coordinate read and write access to T
69    s: Semaphore,
70
71    //inner data T
72    c: UnsafeCell<T>,
73}
74
75/// RAII structure used to release the shared read access of a lock when
76/// dropped.
77///
78/// This structure is created by the [`read`] method on
79/// [`RwLock`].
80///
81/// [`read`]: struct.RwLock.html#method.read
82#[derive(Debug)]
83pub struct RwLockReadGuard<'a, T> {
84    permit: ReleasingPermit<'a, T>,
85    lock: &'a RwLock<T>,
86}
87
88/// RAII structure used to release the exclusive write access of a lock when
89/// dropped.
90///
91/// This structure is created by the [`write`] and method
92/// on [`RwLock`].
93///
94/// [`write`]: struct.RwLock.html#method.write
95/// [`RwLock`]: struct.RwLock.html
96#[derive(Debug)]
97pub struct RwLockWriteGuard<'a, T> {
98    permit: ReleasingPermit<'a, T>,
99    lock: &'a RwLock<T>,
100}
101
102// Wrapper arround Permit that releases on Drop
103#[derive(Debug)]
104struct ReleasingPermit<'a, T> {
105    num_permits: u16,
106    permit: Permit,
107    lock: &'a RwLock<T>,
108}
109
110impl<'a, T> ReleasingPermit<'a, T> {
111    fn poll_acquire(
112        &mut self,
113        cx: &mut Context<'_>,
114        s: &Semaphore,
115    ) -> Poll<Result<(), AcquireError>> {
116        self.permit.poll_acquire(cx, self.num_permits, s)
117    }
118}
119
120impl<'a, T> Drop for ReleasingPermit<'a, T> {
121    fn drop(&mut self) {
122        self.permit.release(self.num_permits, &self.lock.s);
123    }
124}
125
126// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
127// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
128// RwLock<T>.
129unsafe impl<T> Send for RwLock<T> where T: Send {}
130unsafe impl<T> Sync for RwLock<T> where T: Send + Sync {}
131unsafe impl<'a, T> Sync for RwLockReadGuard<'a, T> where T: Send + Sync {}
132unsafe impl<'a, T> Sync for RwLockWriteGuard<'a, T> where T: Send + Sync {}
133
134impl<T> RwLock<T> {
135    /// Creates a new instance of an `RwLock<T>` which is unlocked.
136    ///
137    /// # Examples
138    ///
139    /// ```
140    /// use tokio::sync::RwLock;
141    ///
142    /// let lock = RwLock::new(5);
143    /// ```
144    pub fn new(value: T) -> RwLock<T> {
145        RwLock {
146            c: UnsafeCell::new(value),
147            s: Semaphore::new(MAX_READS),
148        }
149    }
150
151    /// Locks this rwlock with shared read access, blocking the current task
152    /// until it can be acquired.
153    ///
154    /// The calling task will be blocked until there are no more writers which
155    /// hold the lock. There may be other readers currently inside the lock when
156    /// this method returns.
157    ///
158    /// # Examples
159    ///
160    /// ```
161    /// use std::sync::Arc;
162    /// use tokio::sync::RwLock;
163    ///
164    /// #[tokio::main]
165    /// async fn main() {
166    ///     let lock = Arc::new(RwLock::new(1));
167    ///     let c_lock = lock.clone();
168    ///
169    ///     let n = lock.read().await;
170    ///     assert_eq!(*n, 1);
171    ///
172    ///     tokio::spawn(async move {
173    ///         let r = c_lock.read().await;
174    ///         assert_eq!(*r, 1);
175    ///     });
176    ///}
177    /// ```
178    pub async fn read(&self) -> RwLockReadGuard<'_, T> {
179        let mut permit = ReleasingPermit {
180            num_permits: 1,
181            permit: Permit::new(),
182            lock: self,
183        };
184
185        poll_fn(|cx| permit.poll_acquire(cx, &self.s))
186            .await
187            .unwrap_or_else(|_| {
188                // The semaphore was closed. but, we never explicitly close it, and we have a
189                // handle to it through the Arc, which means that this can never happen.
190                unreachable!()
191            });
192        RwLockReadGuard { lock: self, permit }
193    }
194
195    /// Locks this rwlock with exclusive write access, blocking the current
196    /// task until it can be acquired.
197    ///
198    /// This function will not return while other writers or other readers
199    /// currently have access to the lock.
200    ///
201    /// Returns an RAII guard which will drop the write access of this rwlock
202    /// when dropped.
203    ///
204    /// # Examples
205    ///
206    /// ```
207    /// use tokio::sync::RwLock;
208    ///
209    /// #[tokio::main]
210    /// async fn main() {
211    ///   let lock = RwLock::new(1);
212    ///
213    ///   let mut n = lock.write().await;
214    ///   *n = 2;
215    ///}
216    /// ```
217    pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
218        let mut permit = ReleasingPermit {
219            num_permits: MAX_READS as u16,
220            permit: Permit::new(),
221            lock: self,
222        };
223
224        poll_fn(|cx| permit.poll_acquire(cx, &self.s))
225            .await
226            .unwrap_or_else(|_| {
227                // The semaphore was closed. but, we never explicitly close it, and we have a
228                // handle to it through the Arc, which means that this can never happen.
229                unreachable!()
230            });
231
232        RwLockWriteGuard { lock: self, permit }
233    }
234}
235
236impl<T> ops::Deref for RwLockReadGuard<'_, T> {
237    type Target = T;
238
239    fn deref(&self) -> &T {
240        unsafe { &*self.lock.c.get() }
241    }
242}
243
244impl<T> ops::Deref for RwLockWriteGuard<'_, T> {
245    type Target = T;
246
247    fn deref(&self) -> &T {
248        unsafe { &*self.lock.c.get() }
249    }
250}
251
252impl<T> ops::DerefMut for RwLockWriteGuard<'_, T> {
253    fn deref_mut(&mut self) -> &mut T {
254        unsafe { &mut *self.lock.c.get() }
255    }
256}
257
258impl<T> From<T> for RwLock<T> {
259    fn from(s: T) -> Self {
260        Self::new(s)
261    }
262}
263
264impl<T> Default for RwLock<T>
265where
266    T: Default,
267{
268    fn default() -> Self {
269        Self::new(T::default())
270    }
271}