async_lock/rwlock/
futures.rs

1use core::fmt;
2use core::mem::ManuallyDrop;
3use core::pin::Pin;
4use core::task::Poll;
5
6use alloc::sync::Arc;
7
8use super::raw::{RawRead, RawUpgradableRead, RawUpgrade, RawWrite};
9use super::{
10    RwLock, RwLockReadGuard, RwLockReadGuardArc, RwLockUpgradableReadGuard,
11    RwLockUpgradableReadGuardArc, RwLockWriteGuard, RwLockWriteGuardArc,
12};
13
14use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
15
16easy_wrapper! {
17    /// The future returned by [`RwLock::read`].
18    pub struct Read<'a, T: ?Sized>(ReadInner<'a, T> => RwLockReadGuard<'a, T>);
19    #[cfg(all(feature = "std", not(target_family = "wasm")))]
20    pub(crate) wait();
21}
22
23pin_project_lite::pin_project! {
24    /// The future returned by [`RwLock::read`].
25    struct ReadInner<'a, T: ?Sized> {
26        // Raw read lock acquisition future, doesn't depend on `T`.
27        #[pin]
28        pub(super) raw: RawRead<'a>,
29
30        // Pointer to the value protected by the lock. Covariant in `T`.
31        pub(super) value: *const T,
32    }
33}
34
35unsafe impl<T: Sync + ?Sized> Send for ReadInner<'_, T> {}
36unsafe impl<T: Sync + ?Sized> Sync for ReadInner<'_, T> {}
37
38impl<'x, T: ?Sized> Read<'x, T> {
39    #[inline]
40    pub(super) fn new(raw: RawRead<'x>, value: *const T) -> Self {
41        Self::_new(ReadInner { raw, value })
42    }
43}
44
45impl<T: ?Sized> fmt::Debug for Read<'_, T> {
46    #[inline]
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        f.write_str("Read { .. }")
49    }
50}
51
52impl<'a, T: ?Sized> EventListenerFuture for ReadInner<'a, T> {
53    type Output = RwLockReadGuard<'a, T>;
54
55    #[inline]
56    fn poll_with_strategy<'x, S: Strategy<'x>>(
57        self: Pin<&mut Self>,
58        strategy: &mut S,
59        cx: &mut S::Context,
60    ) -> Poll<Self::Output> {
61        let mut this = self.project();
62        ready!(this.raw.as_mut().poll_with_strategy(strategy, cx));
63
64        Poll::Ready(RwLockReadGuard {
65            lock: this.raw.lock,
66            value: *this.value,
67        })
68    }
69}
70
71easy_wrapper! {
72    /// The future returned by [`RwLock::read_arc`].
73    pub struct ReadArc<'a, T>(ReadArcInner<'a, T> => RwLockReadGuardArc<T>);
74    #[cfg(all(feature = "std", not(target_family = "wasm")))]
75    pub(crate) wait();
76}
77
78pin_project_lite::pin_project! {
79    /// The future returned by [`RwLock::read_arc`].
80    struct ReadArcInner<'a, T> {
81        // Raw read lock acquisition future, doesn't depend on `T`.
82        #[pin]
83        pub(super) raw: RawRead<'a>,
84
85        // FIXME: Could be covariant in T
86        pub(super) lock: &'a Arc<RwLock<T>>,
87    }
88}
89
90unsafe impl<T: Send + Sync> Send for ReadArcInner<'_, T> {}
91unsafe impl<T: Send + Sync> Sync for ReadArcInner<'_, T> {}
92
93impl<'x, T> ReadArc<'x, T> {
94    #[inline]
95    pub(super) fn new(raw: RawRead<'x>, lock: &'x Arc<RwLock<T>>) -> Self {
96        Self::_new(ReadArcInner { raw, lock })
97    }
98}
99
100impl<T> fmt::Debug for ReadArc<'_, T> {
101    #[inline]
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        f.write_str("ReadArc { .. }")
104    }
105}
106
107impl<'a, T> EventListenerFuture for ReadArcInner<'a, T> {
108    type Output = RwLockReadGuardArc<T>;
109
110    #[inline]
111    fn poll_with_strategy<'x, S: Strategy<'x>>(
112        self: Pin<&mut Self>,
113        strategy: &mut S,
114        cx: &mut S::Context,
115    ) -> Poll<Self::Output> {
116        let mut this = self.project();
117        ready!(this.raw.as_mut().poll_with_strategy(strategy, cx));
118
119        // SAFETY: we just acquired a read lock
120        Poll::Ready(unsafe { RwLockReadGuardArc::from_arc(this.lock.clone()) })
121    }
122}
123
124easy_wrapper! {
125    /// The future returned by [`RwLock::upgradable_read`].
126    pub struct UpgradableRead<'a, T: ?Sized>(
127        UpgradableReadInner<'a, T> => RwLockUpgradableReadGuard<'a, T>
128    );
129    #[cfg(all(feature = "std", not(target_family = "wasm")))]
130    pub(crate) wait();
131}
132
133pin_project_lite::pin_project! {
134    /// The future returned by [`RwLock::upgradable_read`].
135    struct UpgradableReadInner<'a, T: ?Sized> {
136        // Raw upgradable read lock acquisition future, doesn't depend on `T`.
137        #[pin]
138        pub(super) raw: RawUpgradableRead<'a>,
139
140        // Pointer to the value protected by the lock. Invariant in `T`
141        // as the upgradable lock could provide write access.
142        pub(super) value: *mut T,
143    }
144}
145
146unsafe impl<T: Send + Sync + ?Sized> Send for UpgradableReadInner<'_, T> {}
147unsafe impl<T: Sync + ?Sized> Sync for UpgradableReadInner<'_, T> {}
148
149impl<'x, T: ?Sized> UpgradableRead<'x, T> {
150    #[inline]
151    pub(super) fn new(raw: RawUpgradableRead<'x>, value: *mut T) -> Self {
152        Self::_new(UpgradableReadInner { raw, value })
153    }
154}
155
156impl<T: ?Sized> fmt::Debug for UpgradableRead<'_, T> {
157    #[inline]
158    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159        f.write_str("UpgradableRead { .. }")
160    }
161}
162
163impl<'a, T: ?Sized> EventListenerFuture for UpgradableReadInner<'a, T> {
164    type Output = RwLockUpgradableReadGuard<'a, T>;
165
166    #[inline]
167    fn poll_with_strategy<'x, S: Strategy<'x>>(
168        self: Pin<&mut Self>,
169        strategy: &mut S,
170        cx: &mut S::Context,
171    ) -> Poll<Self::Output> {
172        let mut this = self.project();
173        ready!(this.raw.as_mut().poll_with_strategy(strategy, cx));
174
175        Poll::Ready(RwLockUpgradableReadGuard {
176            lock: this.raw.lock,
177            value: *this.value,
178        })
179    }
180}
181
182easy_wrapper! {
183    /// The future returned by [`RwLock::upgradable_read_arc`].
184    pub struct UpgradableReadArc<'a, T: ?Sized>(
185        UpgradableReadArcInner<'a, T> => RwLockUpgradableReadGuardArc<T>
186    );
187    #[cfg(all(feature = "std", not(target_family = "wasm")))]
188    pub(crate) wait();
189}
190
191pin_project_lite::pin_project! {
192    /// The future returned by [`RwLock::upgradable_read_arc`].
193    struct UpgradableReadArcInner<'a, T: ?Sized> {
194        // Raw upgradable read lock acquisition future, doesn't depend on `T`.
195        #[pin]
196        pub(super) raw: RawUpgradableRead<'a>,
197
198        pub(super) lock: &'a Arc<RwLock<T>>,
199    }
200}
201
202unsafe impl<T: Send + Sync + ?Sized> Send for UpgradableReadArcInner<'_, T> {}
203unsafe impl<T: Send + Sync + ?Sized> Sync for UpgradableReadArcInner<'_, T> {}
204
205impl<'x, T: ?Sized> UpgradableReadArc<'x, T> {
206    #[inline]
207    pub(super) fn new(raw: RawUpgradableRead<'x>, lock: &'x Arc<RwLock<T>>) -> Self {
208        Self::_new(UpgradableReadArcInner { raw, lock })
209    }
210}
211
212impl<T: ?Sized> fmt::Debug for UpgradableReadArc<'_, T> {
213    #[inline]
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        f.write_str("UpgradableReadArc { .. }")
216    }
217}
218
219impl<'a, T: ?Sized> EventListenerFuture for UpgradableReadArcInner<'a, T> {
220    type Output = RwLockUpgradableReadGuardArc<T>;
221
222    #[inline]
223    fn poll_with_strategy<'x, S: Strategy<'x>>(
224        self: Pin<&mut Self>,
225        strategy: &mut S,
226        cx: &mut S::Context,
227    ) -> Poll<Self::Output> {
228        let mut this = self.project();
229        ready!(this.raw.as_mut().poll_with_strategy(strategy, cx));
230        Poll::Ready(RwLockUpgradableReadGuardArc {
231            lock: this.lock.clone(),
232        })
233    }
234}
235
236easy_wrapper! {
237    /// The future returned by [`RwLock::write`].
238    pub struct Write<'a, T: ?Sized>(WriteInner<'a, T> => RwLockWriteGuard<'a, T>);
239    #[cfg(all(feature = "std", not(target_family = "wasm")))]
240    pub(crate) wait();
241}
242
243pin_project_lite::pin_project! {
244    /// The future returned by [`RwLock::write`].
245    struct WriteInner<'a, T: ?Sized> {
246        // Raw write lock acquisition future, doesn't depend on `T`.
247        #[pin]
248        pub(super) raw: RawWrite<'a>,
249
250        // Pointer to the value protected by the lock. Invariant in `T`.
251        pub(super) value: *mut T,
252    }
253}
254
255unsafe impl<T: Send + ?Sized> Send for WriteInner<'_, T> {}
256unsafe impl<T: Sync + ?Sized> Sync for WriteInner<'_, T> {}
257
258impl<'x, T: ?Sized> Write<'x, T> {
259    #[inline]
260    pub(super) fn new(raw: RawWrite<'x>, value: *mut T) -> Self {
261        Self::_new(WriteInner { raw, value })
262    }
263}
264
265impl<T: ?Sized> fmt::Debug for Write<'_, T> {
266    #[inline]
267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268        f.write_str("Write { .. }")
269    }
270}
271
272impl<'a, T: ?Sized> EventListenerFuture for WriteInner<'a, T> {
273    type Output = RwLockWriteGuard<'a, T>;
274
275    #[inline]
276    fn poll_with_strategy<'x, S: Strategy<'x>>(
277        self: Pin<&mut Self>,
278        strategy: &mut S,
279        cx: &mut S::Context,
280    ) -> Poll<Self::Output> {
281        let mut this = self.project();
282        ready!(this.raw.as_mut().poll_with_strategy(strategy, cx));
283
284        Poll::Ready(RwLockWriteGuard {
285            lock: this.raw.lock,
286            value: *this.value,
287        })
288    }
289}
290
291easy_wrapper! {
292    /// The future returned by [`RwLock::write_arc`].
293    pub struct WriteArc<'a, T: ?Sized>(WriteArcInner<'a, T> => RwLockWriteGuardArc<T>);
294    #[cfg(all(feature = "std", not(target_family = "wasm")))]
295    pub(crate) wait();
296}
297
298pin_project_lite::pin_project! {
299    /// The future returned by [`RwLock::write_arc`].
300    struct WriteArcInner<'a, T: ?Sized> {
301        // Raw write lock acquisition future, doesn't depend on `T`.
302        #[pin]
303        pub(super) raw: RawWrite<'a>,
304
305        pub(super) lock: &'a Arc<RwLock<T>>,
306    }
307}
308
309unsafe impl<T: Send + Sync + ?Sized> Send for WriteArcInner<'_, T> {}
310unsafe impl<T: Send + Sync + ?Sized> Sync for WriteArcInner<'_, T> {}
311
312impl<'x, T: ?Sized> WriteArc<'x, T> {
313    #[inline]
314    pub(super) fn new(raw: RawWrite<'x>, lock: &'x Arc<RwLock<T>>) -> Self {
315        Self::_new(WriteArcInner { raw, lock })
316    }
317}
318
319impl<T: ?Sized> fmt::Debug for WriteArc<'_, T> {
320    #[inline]
321    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322        f.write_str("WriteArc { .. }")
323    }
324}
325
326impl<'a, T: ?Sized> EventListenerFuture for WriteArcInner<'a, T> {
327    type Output = RwLockWriteGuardArc<T>;
328
329    #[inline]
330    fn poll_with_strategy<'x, S: Strategy<'x>>(
331        self: Pin<&mut Self>,
332        strategy: &mut S,
333        cx: &mut S::Context,
334    ) -> Poll<Self::Output> {
335        let mut this = self.project();
336        ready!(this.raw.as_mut().poll_with_strategy(strategy, cx));
337
338        Poll::Ready(RwLockWriteGuardArc {
339            lock: this.lock.clone(),
340        })
341    }
342}
343
344easy_wrapper! {
345    /// The future returned by [`RwLockUpgradableReadGuard::upgrade`].
346    pub struct Upgrade<'a, T: ?Sized>(UpgradeInner<'a, T> => RwLockWriteGuard<'a, T>);
347    #[cfg(all(feature = "std", not(target_family = "wasm")))]
348    pub(crate) wait();
349}
350
351pin_project_lite::pin_project! {
352    /// The future returned by [`RwLockUpgradableReadGuard::upgrade`].
353    struct UpgradeInner<'a, T: ?Sized> {
354        // Raw read lock upgrade future, doesn't depend on `T`.
355        #[pin]
356        pub(super) raw: RawUpgrade<'a>,
357
358        // Pointer to the value protected by the lock. Invariant in `T`.
359        pub(super) value: *mut T,
360    }
361}
362
363unsafe impl<T: Send + ?Sized> Send for UpgradeInner<'_, T> {}
364unsafe impl<T: Sync + ?Sized> Sync for UpgradeInner<'_, T> {}
365
366impl<'x, T: ?Sized> Upgrade<'x, T> {
367    #[inline]
368    pub(super) fn new(raw: RawUpgrade<'x>, value: *mut T) -> Self {
369        Self::_new(UpgradeInner { raw, value })
370    }
371}
372
373impl<T: ?Sized> fmt::Debug for Upgrade<'_, T> {
374    #[inline]
375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376        f.debug_struct("Upgrade").finish()
377    }
378}
379
380impl<'a, T: ?Sized> EventListenerFuture for UpgradeInner<'a, T> {
381    type Output = RwLockWriteGuard<'a, T>;
382
383    #[inline]
384    fn poll_with_strategy<'x, S: Strategy<'x>>(
385        self: Pin<&mut Self>,
386        strategy: &mut S,
387        cx: &mut S::Context,
388    ) -> Poll<Self::Output> {
389        let mut this = self.project();
390        let lock = ready!(this.raw.as_mut().poll_with_strategy(strategy, cx));
391
392        Poll::Ready(RwLockWriteGuard {
393            lock,
394            value: *this.value,
395        })
396    }
397}
398
399easy_wrapper! {
400    /// The future returned by [`RwLockUpgradableReadGuardArc::upgrade`].
401    pub struct UpgradeArc<T: ?Sized>(UpgradeArcInner<T> => RwLockWriteGuardArc<T>);
402    #[cfg(all(feature = "std", not(target_family = "wasm")))]
403    pub(crate) wait();
404}
405
406pin_project_lite::pin_project! {
407    /// The future returned by [`RwLockUpgradableReadGuardArc::upgrade`].
408    struct UpgradeArcInner<T: ?Sized> {
409        // Raw read lock upgrade future, doesn't depend on `T`.
410        // `'static` is a lie, this field is actually referencing the
411        // `Arc` data. But since this struct also stores said `Arc`, we know
412        // this value will be alive as long as the struct is.
413        //
414        // Yes, one field of the `ArcUpgrade` struct is referencing another.
415        // Such self-references are usually not sound without pinning.
416        // However, in this case, there is an indirection via the heap;
417        // moving the `ArcUpgrade` won't move the heap allocation of the `Arc`,
418        // so the reference inside `RawUpgrade` isn't invalidated.
419        #[pin]
420        pub(super) raw: ManuallyDrop<RawUpgrade<'static>>,
421
422        // Pointer to the value protected by the lock. Invariant in `T`.
423        pub(super) lock: ManuallyDrop<Arc<RwLock<T>>>,
424    }
425
426    impl<T: ?Sized> PinnedDrop for UpgradeArcInner<T> {
427        fn drop(this: Pin<&mut Self>) {
428            let this = this.project();
429            let is_ready = this.raw.is_ready();
430
431            // SAFETY: The drop impl for raw assumes that it is pinned.
432            unsafe {
433                ManuallyDrop::drop(this.raw.get_unchecked_mut());
434            }
435
436            if !is_ready {
437                // SAFETY: we drop the `Arc` (decrementing the reference count)
438                // only if this future was cancelled before returning an
439                // upgraded lock.
440                unsafe {
441                    ManuallyDrop::drop(this.lock);
442                };
443            }
444        }
445    }
446}
447
448impl<T: ?Sized> UpgradeArc<T> {
449    #[inline]
450    pub(super) unsafe fn new(
451        raw: ManuallyDrop<RawUpgrade<'static>>,
452        lock: ManuallyDrop<Arc<RwLock<T>>>,
453    ) -> Self {
454        Self::_new(UpgradeArcInner { raw, lock })
455    }
456}
457
458impl<T: ?Sized> fmt::Debug for UpgradeArc<T> {
459    #[inline]
460    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461        f.debug_struct("ArcUpgrade").finish()
462    }
463}
464
465impl<T: ?Sized> EventListenerFuture for UpgradeArcInner<T> {
466    type Output = RwLockWriteGuardArc<T>;
467
468    #[inline]
469    fn poll_with_strategy<'x, S: Strategy<'x>>(
470        self: Pin<&mut Self>,
471        strategy: &mut S,
472        cx: &mut S::Context,
473    ) -> Poll<Self::Output> {
474        let this = self.project();
475        unsafe {
476            // SAFETY: Practically, this is a pin projection.
477            ready!(Pin::new_unchecked(&mut **this.raw.get_unchecked_mut())
478                .poll_with_strategy(strategy, cx));
479        }
480
481        Poll::Ready(RwLockWriteGuardArc {
482            lock: unsafe { ManuallyDrop::take(this.lock) },
483        })
484    }
485}