broker_tokio/sync/semaphore.rs
1use super::semaphore_ll as ll; // low level implementation
2use crate::future::poll_fn;
3
4/// Counting semaphore performing asynchronous permit aquisition.
5///
6/// A semaphore maintains a set of permits. Permits are used to synchronize
7/// access to a shared resource. A semaphore differs from a mutex in that it
8/// can allow more than one concurrent caller to access the shared resource at a
9/// time.
10///
11/// When `acquire` is called and the semaphore has remaining permits, the
12/// function immediately returns a permit. However, if no remaining permits are
13/// available, `acquire` (asynchronously) waits until an outstanding permit is
14/// dropped. At this point, the freed permit is assigned to the caller.
15#[derive(Debug)]
16pub struct Semaphore {
17 /// The low level semaphore
18 ll_sem: ll::Semaphore,
19}
20
21/// A permit from the semaphore
22#[must_use]
23#[derive(Debug)]
24pub struct SemaphorePermit<'a> {
25 sem: &'a Semaphore,
26 // the low level permit
27 ll_permit: ll::Permit,
28}
29
30/// Error returned from the [`Semaphore::try_acquire`] function.
31///
32/// A `try_acquire` operation can only fail if the semaphore has no available
33/// permits.
34///
35/// [`Semaphore::try_acquire`]: Semaphore::try_acquire
36#[derive(Debug)]
37pub struct TryAcquireError(());
38
39impl Semaphore {
40 /// Creates a new semaphore with the initial number of permits
41 pub fn new(permits: usize) -> Self {
42 Self {
43 ll_sem: ll::Semaphore::new(permits),
44 }
45 }
46
47 /// Returns the current number of available permits
48 pub fn available_permits(&self) -> usize {
49 self.ll_sem.available_permits()
50 }
51
52 /// Add `n` new permits to the semaphore.
53 pub fn add_permits(&self, n: usize) {
54 self.ll_sem.add_permits(n);
55 }
56
57 /// Acquire permit from the semaphore
58 pub async fn acquire(&self) -> SemaphorePermit<'_> {
59 let mut permit = SemaphorePermit {
60 sem: &self,
61 ll_permit: ll::Permit::new(),
62 };
63 poll_fn(|cx| permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem))
64 .await
65 .unwrap();
66 permit
67 }
68
69 /// Try to acquire a permit form the semaphore
70 pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
71 let mut ll_permit = ll::Permit::new();
72 match ll_permit.try_acquire(1, &self.ll_sem) {
73 Ok(_) => Ok(SemaphorePermit {
74 sem: self,
75 ll_permit,
76 }),
77 Err(_) => Err(TryAcquireError(())),
78 }
79 }
80}
81
82impl<'a> SemaphorePermit<'a> {
83 /// Forget the permit **without** releasing it back to the semaphore.
84 /// This can be used to reduce the amount of permits available from a
85 /// semaphore.
86 pub fn forget(mut self) {
87 self.ll_permit.forget(1);
88 }
89}
90
91impl<'a> Drop for SemaphorePermit<'_> {
92 fn drop(&mut self) {
93 self.ll_permit.release(1, &self.sem.ll_sem);
94 }
95}