broker_tokio/sync/semaphore.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
use super::semaphore_ll as ll; // low level implementation
use crate::future::poll_fn;
/// Counting semaphore performing asynchronous permit aquisition.
///
/// A semaphore maintains a set of permits. Permits are used to synchronize
/// access to a shared resource. A semaphore differs from a mutex in that it
/// can allow more than one concurrent caller to access the shared resource at a
/// time.
///
/// When `acquire` is called and the semaphore has remaining permits, the
/// function immediately returns a permit. However, if no remaining permits are
/// available, `acquire` (asynchronously) waits until an outstanding permit is
/// dropped. At this point, the freed permit is assigned to the caller.
#[derive(Debug)]
pub struct Semaphore {
/// The low level semaphore
ll_sem: ll::Semaphore,
}
/// A permit from the semaphore
#[must_use]
#[derive(Debug)]
pub struct SemaphorePermit<'a> {
sem: &'a Semaphore,
// the low level permit
ll_permit: ll::Permit,
}
/// Error returned from the [`Semaphore::try_acquire`] function.
///
/// A `try_acquire` operation can only fail if the semaphore has no available
/// permits.
///
/// [`Semaphore::try_acquire`]: Semaphore::try_acquire
#[derive(Debug)]
pub struct TryAcquireError(());
impl Semaphore {
/// Creates a new semaphore with the initial number of permits
pub fn new(permits: usize) -> Self {
Self {
ll_sem: ll::Semaphore::new(permits),
}
}
/// Returns the current number of available permits
pub fn available_permits(&self) -> usize {
self.ll_sem.available_permits()
}
/// Add `n` new permits to the semaphore.
pub fn add_permits(&self, n: usize) {
self.ll_sem.add_permits(n);
}
/// Acquire permit from the semaphore
pub async fn acquire(&self) -> SemaphorePermit<'_> {
let mut permit = SemaphorePermit {
sem: &self,
ll_permit: ll::Permit::new(),
};
poll_fn(|cx| permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem))
.await
.unwrap();
permit
}
/// Try to acquire a permit form the semaphore
pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
let mut ll_permit = ll::Permit::new();
match ll_permit.try_acquire(1, &self.ll_sem) {
Ok(_) => Ok(SemaphorePermit {
sem: self,
ll_permit,
}),
Err(_) => Err(TryAcquireError(())),
}
}
}
impl<'a> SemaphorePermit<'a> {
/// Forget the permit **without** releasing it back to the semaphore.
/// This can be used to reduce the amount of permits available from a
/// semaphore.
pub fn forget(mut self) {
self.ll_permit.forget(1);
}
}
impl<'a> Drop for SemaphorePermit<'_> {
fn drop(&mut self) {
self.ll_permit.release(1, &self.sem.ll_sem);
}
}