pub struct Semaphore(_);
Expand description

Counting semaphore performing asynchronous permit acquisition.

A semaphore maintains a set of permits. Permits are used to synchronize access to a shared resource. A semaphore differs from a mutex in that it can allow more than one concurrent caller to access the shared resource at a time.

When acquire is called and the semaphore has remaining permits, the function immediately returns a permit. However, if no remaining permits are available, acquire (asynchronously) waits until an outstanding permit is dropped. At this point, the freed permit is assigned to the caller.

This Semaphore is fair, which means that permits are given out in the order they were requested. This fairness is also applied when acquire_many gets involved, so if a call to acquire_many at the front of the queue requests more permits than currently available, this can prevent a call to acquire from completing, even if the semaphore has enough permits complete the call to acquire.

To use the Semaphore in a poll function, you can use the PollSemaphore utility.

Examples

Basic usage:

use local_sync::semaphore::{Semaphore, TryAcquireError};

#[monoio::main]
async fn main() {
    let semaphore = Semaphore::new(3);

    let a_permit = semaphore.acquire().await.unwrap();
    let two_permits = semaphore.acquire_many(2).await.unwrap();

    assert_eq!(semaphore.available_permits(), 0);

    let permit_attempt = semaphore.try_acquire();
    assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
}

Use Semaphore::acquire_owned to move permits across tasks:

use std::rc::Rc;
use local_sync::semaphore::Semaphore;

#[monoio::main]
async fn main() {
    let semaphore = Rc::new(Semaphore::new(3));
    let mut join_handles = Vec::new();

    for _ in 0..5 {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        join_handles.push(monoio::spawn(async move {
            // perform task...
            // explicitly own `permit` in the task
            drop(permit);
        }));
    }

    for handle in join_handles {
        handle.await;
    }
}

Implementations§

source§

impl Semaphore

source

pub const fn new(permits: usize) -> Self

Creates a new semaphore with the initial number of permits.

source

pub fn available_permits(&self) -> usize

Returns the current number of available permits.

source

pub fn add_permits(&self, n: usize)

Adds n new permits to the semaphore.

The maximum number of permits is usize::MAX >> 3, and this function will panic if the limit is exceeded.

source

pub fn acquire(&self) -> AcquireResult<'_>

Acquires a permit from the semaphore.

If the semaphore has been closed, this returns an AcquireError. Otherwise, this returns a SemaphorePermit representing the acquired permit.

Cancel safety

This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to acquire makes you lose your place in the queue.

Examples
use local_sync::semaphore::Semaphore;

#[monoio::main]
async fn main() {
    let semaphore = Semaphore::new(2);

    let permit_1 = semaphore.acquire().await.unwrap();
    assert_eq!(semaphore.available_permits(), 1);

    let permit_2 = semaphore.acquire().await.unwrap();
    assert_eq!(semaphore.available_permits(), 0);

    drop(permit_1);
    assert_eq!(semaphore.available_permits(), 1);
}
source

pub fn acquire_many(&self, n: u32) -> AcquireResult<'_>

Acquires n permits from the semaphore.

If the semaphore has been closed, this returns an AcquireError. Otherwise, this returns a SemaphorePermit representing the acquired permits.

Cancel safety

This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to acquire_many makes you lose your place in the queue.

Examples
use local_sync::semaphore::Semaphore;

#[monoio::main]
async fn main() {
    let semaphore = Semaphore::new(5);

    let permit = semaphore.acquire_many(3).await.unwrap();
    assert_eq!(semaphore.available_permits(), 2);
}
source

pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>

Tries to acquire a permit from the semaphore.

If the semaphore has been closed, this returns a TryAcquireError::Closed and a TryAcquireError::NoPermits if there are no permits left. Otherwise, this returns a SemaphorePermit representing the acquired permits.

Examples
use local_sync::semaphore::{Semaphore, TryAcquireError};

let semaphore = Semaphore::new(2);

let permit_1 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 0);

let permit_3 = semaphore.try_acquire();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
source

pub fn try_acquire_many( &self, n: u32 ) -> Result<SemaphorePermit<'_>, TryAcquireError>

Tries to acquire n permits from the semaphore.

If the semaphore has been closed, this returns a TryAcquireError::Closed and a TryAcquireError::NoPermits if there are not enough permits left. Otherwise, this returns a SemaphorePermit representing the acquired permits.

Examples
use local_sync::semaphore::{Semaphore, TryAcquireError};

let semaphore = Semaphore::new(4);

let permit_1 = semaphore.try_acquire_many(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire_many(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
source

pub async fn acquire_owned( self: Rc<Self> ) -> Result<OwnedSemaphorePermit, AcquireError>

Acquires a permit from the semaphore.

The semaphore must be wrapped in an Rc to call this method. If the semaphore has been closed, this returns an AcquireError. Otherwise, this returns a OwnedSemaphorePermit representing the acquired permit.

Cancel safety

This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to acquire_owned makes you lose your place in the queue.

Examples
use std::rc::Rc;
use local_sync::semaphore::Semaphore;

#[monoio::main]
async fn main() {
    let semaphore = Rc::new(Semaphore::new(3));
    let mut join_handles = Vec::new();

    for _ in 0..5 {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        join_handles.push(monoio::spawn(async move {
            // perform task...
            // explicitly own `permit` in the task
            drop(permit);
        }));
    }

    for handle in join_handles {
        handle.await;
    }
}
source

pub async fn acquire_many_owned( self: Rc<Self>, n: u32 ) -> Result<OwnedSemaphorePermit, AcquireError>

Acquires n permits from the semaphore.

The semaphore must be wrapped in an Rc to call this method. If the semaphore has been closed, this returns an AcquireError. Otherwise, this returns a OwnedSemaphorePermit representing the acquired permit.

Cancel safety

This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to acquire_many_owned makes you lose your place in the queue.

Examples
use std::rc::Rc;
use local_sync::semaphore::Semaphore;

#[monoio::main]
async fn main() {
    let semaphore = Rc::new(Semaphore::new(10));
    let mut join_handles = Vec::new();

    for _ in 0..5 {
        let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
        join_handles.push(monoio::spawn(async move {
            // perform task...
            // explicitly own `permit` in the task
            drop(permit);
        }));
    }

    for handle in join_handles {
        handle.await;
    }
}
source

pub fn try_acquire_owned( self: Rc<Self> ) -> Result<OwnedSemaphorePermit, TryAcquireError>

Tries to acquire a permit from the semaphore.

The semaphore must be wrapped in an Rc to call this method. If the semaphore has been closed, this returns a TryAcquireError::Closed and a TryAcquireError::NoPermits if there are no permits left. Otherwise, this returns a OwnedSemaphorePermit representing the acquired permit.

Examples
use std::rc::Rc;
use local_sync::semaphore::{Semaphore, TryAcquireError};

let semaphore = Rc::new(Semaphore::new(2));

let permit_1 = Rc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = Rc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 0);

let permit_3 = semaphore.try_acquire_owned();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
source

pub fn try_acquire_many_owned( self: Rc<Self>, n: u32 ) -> Result<OwnedSemaphorePermit, TryAcquireError>

Tries to acquire n permits from the semaphore.

The semaphore must be wrapped in an Rc to call this method. If the semaphore has been closed, this returns a TryAcquireError::Closed and a TryAcquireError::NoPermits if there are no permits left. Otherwise, this returns a OwnedSemaphorePermit representing the acquired permit.

Examples
use std::rc::Rc;
use local_sync::semaphore::{Semaphore, TryAcquireError};

let semaphore = Rc::new(Semaphore::new(4));

let permit_1 = Rc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire_many_owned(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
source

pub fn close(&self)

Closes the semaphore.

This prevents the semaphore from issuing new permits and notifies all pending waiters.

Examples
use local_sync::semaphore::{Semaphore, TryAcquireError};
use std::rc::Rc;

#[monoio::main]
async fn main() {
    let semaphore = Rc::new(Semaphore::new(1));
    let semaphore2 = semaphore.clone();

    monoio::spawn(async move {
        let permit = semaphore.acquire_many(2).await;
        assert!(permit.is_err());
        println!("waiter received error");
    });

    println!("closing semaphore");
    semaphore2.close();

    // Cannot obtain more permits
    assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
}
source

pub fn is_closed(&self) -> bool

Returns true if the semaphore is closed

Trait Implementations§

source§

impl Debug for Semaphore

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.