broker_tokio/sync/
semaphore.rs

1use super::semaphore_ll as ll; // low level implementation
2use crate::future::poll_fn;
3
4/// Counting semaphore performing asynchronous permit aquisition.
5///
6/// A semaphore maintains a set of permits. Permits are used to synchronize
7/// access to a shared resource. A semaphore differs from a mutex in that it
8/// can allow more than one concurrent caller to access the shared resource at a
9/// time.
10///
11/// When `acquire` is called and the semaphore has remaining permits, the
12/// function immediately returns a permit. However, if no remaining permits are
13/// available, `acquire` (asynchronously) waits until an outstanding permit is
14/// dropped. At this point, the freed permit is assigned to the caller.
15#[derive(Debug)]
16pub struct Semaphore {
17    /// The low level semaphore
18    ll_sem: ll::Semaphore,
19}
20
21/// A permit from the semaphore
22#[must_use]
23#[derive(Debug)]
24pub struct SemaphorePermit<'a> {
25    sem: &'a Semaphore,
26    // the low level permit
27    ll_permit: ll::Permit,
28}
29
30/// Error returned from the [`Semaphore::try_acquire`] function.
31///
32/// A `try_acquire` operation can only fail if the semaphore has no available
33/// permits.
34///
35/// [`Semaphore::try_acquire`]: Semaphore::try_acquire
36#[derive(Debug)]
37pub struct TryAcquireError(());
38
39impl Semaphore {
40    /// Creates a new semaphore with the initial number of permits
41    pub fn new(permits: usize) -> Self {
42        Self {
43            ll_sem: ll::Semaphore::new(permits),
44        }
45    }
46
47    /// Returns the current number of available permits
48    pub fn available_permits(&self) -> usize {
49        self.ll_sem.available_permits()
50    }
51
52    /// Add `n` new permits to the semaphore.
53    pub fn add_permits(&self, n: usize) {
54        self.ll_sem.add_permits(n);
55    }
56
57    /// Acquire permit from the semaphore
58    pub async fn acquire(&self) -> SemaphorePermit<'_> {
59        let mut permit = SemaphorePermit {
60            sem: &self,
61            ll_permit: ll::Permit::new(),
62        };
63        poll_fn(|cx| permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem))
64            .await
65            .unwrap();
66        permit
67    }
68
69    /// Try to acquire a permit form the semaphore
70    pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
71        let mut ll_permit = ll::Permit::new();
72        match ll_permit.try_acquire(1, &self.ll_sem) {
73            Ok(_) => Ok(SemaphorePermit {
74                sem: self,
75                ll_permit,
76            }),
77            Err(_) => Err(TryAcquireError(())),
78        }
79    }
80}
81
82impl<'a> SemaphorePermit<'a> {
83    /// Forget the permit **without** releasing it back to the semaphore.
84    /// This can be used to reduce the amount of permits available from a
85    /// semaphore.
86    pub fn forget(mut self) {
87        self.ll_permit.forget(1);
88    }
89}
90
91impl<'a> Drop for SemaphorePermit<'_> {
92    fn drop(&mut self) {
93        self.ll_permit.release(1, &self.sem.ll_sem);
94    }
95}