broker_tokio/sync/rwlock.rs
1use crate::future::poll_fn;
2use crate::sync::semaphore_ll::{AcquireError, Permit, Semaphore};
3use std::cell::UnsafeCell;
4use std::ops;
5use std::task::{Context, Poll};
6
7#[cfg(not(loom))]
8const MAX_READS: usize = 32;
9
10#[cfg(loom)]
11const MAX_READS: usize = 10;
12
13/// An asynchronous reader-writer lock
14///
15/// This type of lock allows a number of readers or at most one writer at any
16/// point in time. The write portion of this lock typically allows modification
17/// of the underlying data (exclusive access) and the read portion of this lock
18/// typically allows for read-only access (shared access).
19///
20/// In comparison, a [`Mutex`] does not distinguish between readers or writers
21/// that acquire the lock, therefore blocking any tasks waiting for the lock to
22/// become available. An `RwLock` will allow any number of readers to acquire the
23/// lock as long as a writer is not holding the lock.
24///
25/// The priority policy of the lock is dependent on the underlying operating
26/// system's implementation, and this type does not guarantee that any
27/// particular policy will be used.
28///
29/// The type parameter `T` represents the data that this lock protects. It is
30/// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
31/// returned from the locking methods implement [`Deref`](https://doc.rust-lang.org/std/ops/trait.Deref.html)
32/// (and [`DerefMut`](https://doc.rust-lang.org/std/ops/trait.DerefMut.html)
33/// for the `write` methods) to allow access to the content of the lock.
34///
35/// # Examples
36///
37/// ```
38/// use tokio::sync::RwLock;
39///
40/// #[tokio::main]
41/// async fn main() {
42/// let lock = RwLock::new(5);
43///
44/// // many reader locks can be held at once
45/// {
46/// let r1 = lock.read().await;
47/// let r2 = lock.read().await;
48/// assert_eq!(*r1, 5);
49/// assert_eq!(*r2, 5);
50/// } // read locks are dropped at this point
51///
52/// // only one write lock may be held, however
53/// {
54/// let mut w = lock.write().await;
55/// *w += 1;
56/// assert_eq!(*w, 6);
57/// } // write lock is dropped here
58/// }
59/// ```
60///
61/// [`Mutex`]: struct.Mutex.html
62/// [`RwLock`]: struct.RwLock.html
63/// [`RwLockReadGuard`]: struct.RwLockReadGuard.html
64/// [`RwLockWriteGuard`]: struct.RwLockWriteGuard.html
65/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
66#[derive(Debug)]
67pub struct RwLock<T> {
68 //semaphore to coordinate read and write access to T
69 s: Semaphore,
70
71 //inner data T
72 c: UnsafeCell<T>,
73}
74
75/// RAII structure used to release the shared read access of a lock when
76/// dropped.
77///
78/// This structure is created by the [`read`] method on
79/// [`RwLock`].
80///
81/// [`read`]: struct.RwLock.html#method.read
82#[derive(Debug)]
83pub struct RwLockReadGuard<'a, T> {
84 permit: ReleasingPermit<'a, T>,
85 lock: &'a RwLock<T>,
86}
87
88/// RAII structure used to release the exclusive write access of a lock when
89/// dropped.
90///
91/// This structure is created by the [`write`] and method
92/// on [`RwLock`].
93///
94/// [`write`]: struct.RwLock.html#method.write
95/// [`RwLock`]: struct.RwLock.html
96#[derive(Debug)]
97pub struct RwLockWriteGuard<'a, T> {
98 permit: ReleasingPermit<'a, T>,
99 lock: &'a RwLock<T>,
100}
101
102// Wrapper arround Permit that releases on Drop
103#[derive(Debug)]
104struct ReleasingPermit<'a, T> {
105 num_permits: u16,
106 permit: Permit,
107 lock: &'a RwLock<T>,
108}
109
110impl<'a, T> ReleasingPermit<'a, T> {
111 fn poll_acquire(
112 &mut self,
113 cx: &mut Context<'_>,
114 s: &Semaphore,
115 ) -> Poll<Result<(), AcquireError>> {
116 self.permit.poll_acquire(cx, self.num_permits, s)
117 }
118}
119
120impl<'a, T> Drop for ReleasingPermit<'a, T> {
121 fn drop(&mut self) {
122 self.permit.release(self.num_permits, &self.lock.s);
123 }
124}
125
126// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
127// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
128// RwLock<T>.
129unsafe impl<T> Send for RwLock<T> where T: Send {}
130unsafe impl<T> Sync for RwLock<T> where T: Send + Sync {}
131unsafe impl<'a, T> Sync for RwLockReadGuard<'a, T> where T: Send + Sync {}
132unsafe impl<'a, T> Sync for RwLockWriteGuard<'a, T> where T: Send + Sync {}
133
134impl<T> RwLock<T> {
135 /// Creates a new instance of an `RwLock<T>` which is unlocked.
136 ///
137 /// # Examples
138 ///
139 /// ```
140 /// use tokio::sync::RwLock;
141 ///
142 /// let lock = RwLock::new(5);
143 /// ```
144 pub fn new(value: T) -> RwLock<T> {
145 RwLock {
146 c: UnsafeCell::new(value),
147 s: Semaphore::new(MAX_READS),
148 }
149 }
150
151 /// Locks this rwlock with shared read access, blocking the current task
152 /// until it can be acquired.
153 ///
154 /// The calling task will be blocked until there are no more writers which
155 /// hold the lock. There may be other readers currently inside the lock when
156 /// this method returns.
157 ///
158 /// # Examples
159 ///
160 /// ```
161 /// use std::sync::Arc;
162 /// use tokio::sync::RwLock;
163 ///
164 /// #[tokio::main]
165 /// async fn main() {
166 /// let lock = Arc::new(RwLock::new(1));
167 /// let c_lock = lock.clone();
168 ///
169 /// let n = lock.read().await;
170 /// assert_eq!(*n, 1);
171 ///
172 /// tokio::spawn(async move {
173 /// let r = c_lock.read().await;
174 /// assert_eq!(*r, 1);
175 /// });
176 ///}
177 /// ```
178 pub async fn read(&self) -> RwLockReadGuard<'_, T> {
179 let mut permit = ReleasingPermit {
180 num_permits: 1,
181 permit: Permit::new(),
182 lock: self,
183 };
184
185 poll_fn(|cx| permit.poll_acquire(cx, &self.s))
186 .await
187 .unwrap_or_else(|_| {
188 // The semaphore was closed. but, we never explicitly close it, and we have a
189 // handle to it through the Arc, which means that this can never happen.
190 unreachable!()
191 });
192 RwLockReadGuard { lock: self, permit }
193 }
194
195 /// Locks this rwlock with exclusive write access, blocking the current
196 /// task until it can be acquired.
197 ///
198 /// This function will not return while other writers or other readers
199 /// currently have access to the lock.
200 ///
201 /// Returns an RAII guard which will drop the write access of this rwlock
202 /// when dropped.
203 ///
204 /// # Examples
205 ///
206 /// ```
207 /// use tokio::sync::RwLock;
208 ///
209 /// #[tokio::main]
210 /// async fn main() {
211 /// let lock = RwLock::new(1);
212 ///
213 /// let mut n = lock.write().await;
214 /// *n = 2;
215 ///}
216 /// ```
217 pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
218 let mut permit = ReleasingPermit {
219 num_permits: MAX_READS as u16,
220 permit: Permit::new(),
221 lock: self,
222 };
223
224 poll_fn(|cx| permit.poll_acquire(cx, &self.s))
225 .await
226 .unwrap_or_else(|_| {
227 // The semaphore was closed. but, we never explicitly close it, and we have a
228 // handle to it through the Arc, which means that this can never happen.
229 unreachable!()
230 });
231
232 RwLockWriteGuard { lock: self, permit }
233 }
234}
235
236impl<T> ops::Deref for RwLockReadGuard<'_, T> {
237 type Target = T;
238
239 fn deref(&self) -> &T {
240 unsafe { &*self.lock.c.get() }
241 }
242}
243
244impl<T> ops::Deref for RwLockWriteGuard<'_, T> {
245 type Target = T;
246
247 fn deref(&self) -> &T {
248 unsafe { &*self.lock.c.get() }
249 }
250}
251
252impl<T> ops::DerefMut for RwLockWriteGuard<'_, T> {
253 fn deref_mut(&mut self) -> &mut T {
254 unsafe { &mut *self.lock.c.get() }
255 }
256}
257
258impl<T> From<T> for RwLock<T> {
259 fn from(s: T) -> Self {
260 Self::new(s)
261 }
262}
263
264impl<T> Default for RwLock<T>
265where
266 T: Default,
267{
268 fn default() -> Self {
269 Self::new(T::default())
270 }
271}