sqlx_core/
sync.rs

1// For types with identical signatures that don't require runtime support,
2// we can just arbitrarily pick one to use based on what's enabled.
3//
4// We'll generally lean towards Tokio's types as those are more featureful
5// (including `tokio-console` support) and more widely deployed.
6
7#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
8pub use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
9
10#[cfg(feature = "_rt-tokio")]
11pub use tokio::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
12
13pub struct AsyncSemaphore {
14    // We use the semaphore from futures-intrusive as the one from async-std
15    // is missing the ability to add arbitrary permits, and is not guaranteed to be fair:
16    // * https://github.com/smol-rs/async-lock/issues/22
17    // * https://github.com/smol-rs/async-lock/issues/23
18    //
19    // We're on the look-out for a replacement, however, as futures-intrusive is not maintained
20    // and there are some soundness concerns (although it turns out any intrusive future is unsound
21    // in MIRI due to the necessitated mutable aliasing):
22    // https://github.com/launchbadge/sqlx/issues/1668
23    #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
24    inner: futures_intrusive::sync::Semaphore,
25
26    #[cfg(feature = "_rt-tokio")]
27    inner: tokio::sync::Semaphore,
28}
29
30impl AsyncSemaphore {
31    #[track_caller]
32    pub fn new(fair: bool, permits: usize) -> Self {
33        if cfg!(not(any(feature = "_rt-async-std", feature = "_rt-tokio"))) {
34            crate::rt::missing_rt((fair, permits));
35        }
36
37        AsyncSemaphore {
38            #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
39            inner: futures_intrusive::sync::Semaphore::new(fair, permits),
40            #[cfg(feature = "_rt-tokio")]
41            inner: {
42                debug_assert!(fair, "Tokio only has fair permits");
43                tokio::sync::Semaphore::new(permits)
44            },
45        }
46    }
47
48    pub fn permits(&self) -> usize {
49        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
50        return self.inner.permits();
51
52        #[cfg(feature = "_rt-tokio")]
53        return self.inner.available_permits();
54
55        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
56        crate::rt::missing_rt(())
57    }
58
59    pub async fn acquire(&self, permits: u32) -> AsyncSemaphoreReleaser<'_> {
60        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
61        return AsyncSemaphoreReleaser {
62            inner: self.inner.acquire(permits as usize).await,
63        };
64
65        #[cfg(feature = "_rt-tokio")]
66        return AsyncSemaphoreReleaser {
67            inner: self
68                .inner
69                // Weird quirk: `tokio::sync::Semaphore` mostly uses `usize` for permit counts,
70                // but `u32` for this and `try_acquire_many()`.
71                .acquire_many(permits)
72                .await
73                .expect("BUG: we do not expose the `.close()` method"),
74        };
75
76        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
77        crate::rt::missing_rt(permits)
78    }
79
80    pub fn try_acquire(&self, permits: u32) -> Option<AsyncSemaphoreReleaser<'_>> {
81        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
82        return Some(AsyncSemaphoreReleaser {
83            inner: self.inner.try_acquire(permits as usize)?,
84        });
85
86        #[cfg(feature = "_rt-tokio")]
87        return Some(AsyncSemaphoreReleaser {
88            inner: self.inner.try_acquire_many(permits).ok()?,
89        });
90
91        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
92        crate::rt::missing_rt(permits)
93    }
94
95    pub fn release(&self, permits: usize) {
96        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
97        return self.inner.release(permits);
98
99        #[cfg(feature = "_rt-tokio")]
100        return self.inner.add_permits(permits);
101
102        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
103        crate::rt::missing_rt(permits)
104    }
105}
106
107pub struct AsyncSemaphoreReleaser<'a> {
108    // We use the semaphore from futures-intrusive as the one from async-std
109    // is missing the ability to add arbitrary permits, and is not guaranteed to be fair:
110    // * https://github.com/smol-rs/async-lock/issues/22
111    // * https://github.com/smol-rs/async-lock/issues/23
112    //
113    // We're on the look-out for a replacement, however, as futures-intrusive is not maintained
114    // and there are some soundness concerns (although it turns out any intrusive future is unsound
115    // in MIRI due to the necessitated mutable aliasing):
116    // https://github.com/launchbadge/sqlx/issues/1668
117    #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
118    inner: futures_intrusive::sync::SemaphoreReleaser<'a>,
119
120    #[cfg(feature = "_rt-tokio")]
121    inner: tokio::sync::SemaphorePermit<'a>,
122
123    #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
124    _phantom: std::marker::PhantomData<&'a ()>,
125}
126
127impl AsyncSemaphoreReleaser<'_> {
128    pub fn disarm(self) {
129        #[cfg(feature = "_rt-tokio")]
130        {
131            self.inner.forget();
132        }
133
134        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
135        {
136            let mut this = self;
137            this.inner.disarm();
138        }
139
140        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
141        crate::rt::missing_rt(())
142    }
143}