futures_intrusive/sync/
semaphore.rs

1//! An asynchronously awaitable semaphore for synchronization between concurrently
2//! executing futures.
3
4use crate::{
5    intrusive_double_linked_list::{LinkedList, ListNode},
6    utils::update_waker_ref,
7    NoopLock,
8};
9use core::pin::Pin;
10use futures_core::{
11    future::{FusedFuture, Future},
12    task::{Context, Poll, Waker},
13};
14use lock_api::{Mutex as LockApiMutex, RawMutex};
15
16/// Tracks how the future had interacted with the semaphore
17#[derive(PartialEq)]
18enum PollState {
19    /// The task has never interacted with the semaphore.
20    New,
21    /// The task was added to the wait queue at the semaphore.
22    Waiting,
23    /// The task had previously waited on the semaphore, but was notified
24    /// that the semaphore was released in the meantime and that the task
25    /// thereby could retry.
26    Notified,
27    /// The task had been polled to completion.
28    Done,
29}
30
31/// Tracks the SemaphoreAcquireFuture waiting state.
32struct WaitQueueEntry {
33    /// The task handle of the waiting task
34    task: Option<Waker>,
35    /// Current polling state
36    state: PollState,
37    /// The amount of permits that should be obtained
38    required_permits: usize,
39}
40
41impl WaitQueueEntry {
42    /// Creates a new WaitQueueEntry
43    fn new(required_permits: usize) -> WaitQueueEntry {
44        WaitQueueEntry {
45            task: None,
46            state: PollState::New,
47            required_permits,
48        }
49    }
50}
51
52/// Internal state of the `Semaphore`
53struct SemaphoreState {
54    is_fair: bool,
55    permits: usize,
56    waiters: LinkedList<WaitQueueEntry>,
57}
58
59impl SemaphoreState {
60    fn new(is_fair: bool, permits: usize) -> Self {
61        SemaphoreState {
62            is_fair,
63            permits,
64            waiters: LinkedList::new(),
65        }
66    }
67
68    /// Wakes up the last waiter and removes it from the wait queue
69    fn wakeup_waiters(&mut self) {
70        // Wake as many tasks as the permits allow
71        let mut available = self.permits;
72
73        loop {
74            match self.waiters.peek_last_mut() {
75                None => return,
76                Some(last_waiter) => {
77                    // Check if enough permits are available for this waiter.
78                    // If not then a wakeup attempt won't be successful.
79                    if available < last_waiter.required_permits {
80                        return;
81                    }
82                    available -= last_waiter.required_permits;
83
84                    // Notify the waiter that it can try to acquire the semaphore again.
85                    // The notification gets tracked inside the waiter.
86                    // If the waiter aborts it's wait (drops the future), another task
87                    // must be woken.
88                    if last_waiter.state != PollState::Notified {
89                        last_waiter.state = PollState::Notified;
90
91                        let task = &last_waiter.task;
92                        if let Some(ref handle) = task {
93                            handle.wake_by_ref();
94                        }
95                    }
96
97                    // In the case of a non-fair semaphore, the waiters are directly
98                    // removed from the semaphores wait queue when woken.
99                    // That avoids having to remove the wait element later.
100                    if !self.is_fair {
101                        self.waiters.remove_last();
102                    } else {
103                        // For a fair Semaphore we never wake more than 1 task.
104                        // That one needs to acquire the Semaphore.
105                        // TODO: We actually should be able to wake more, since
106                        // it's guaranteed that both tasks could make progress.
107                        // However the we currently can't peek iterate in reverse order.
108                        return;
109                    }
110                }
111            }
112        }
113    }
114
115    fn permits(&self) -> usize {
116        self.permits
117    }
118
119    /// Releases a certain amount of permits back to the semaphore
120    fn release(&mut self, permits: usize) {
121        if permits == 0 {
122            return;
123        }
124        // TODO: Overflow check
125        self.permits += permits;
126
127        // Wakeup the last waiter
128        self.wakeup_waiters();
129    }
130
131    /// Tries to acquire the given amount of permits synchronously.
132    ///
133    /// Returns true if the permits were obtained and false otherwise.
134    fn try_acquire_sync(&mut self, required_permits: usize) -> bool {
135        // Permits can only be obtained synchronously if there are
136        // - enough permits available
137        // - the Semaphore is either not fair, or there are no waiters
138        // - required_permits == 0
139        if (self.permits >= required_permits)
140            && (!self.is_fair
141                || self.waiters.is_empty()
142                || required_permits == 0)
143        {
144            self.permits -= required_permits;
145            true
146        } else {
147            false
148        }
149    }
150
151    /// Tries to acquire the Semaphore from a WaitQueueEntry.
152    /// If it isn't available, the WaitQueueEntry gets added to the wait
153    /// queue at the Semaphore, and will be signalled once ready.
154    /// This function is only safe as long as the `wait_node`s address is guaranteed
155    /// to be stable until it gets removed from the queue.
156    unsafe fn try_acquire(
157        &mut self,
158        wait_node: &mut ListNode<WaitQueueEntry>,
159        cx: &mut Context<'_>,
160    ) -> Poll<()> {
161        match wait_node.state {
162            PollState::New => {
163                // The fast path - enough permits are available
164                if self.try_acquire_sync(wait_node.required_permits) {
165                    wait_node.state = PollState::Done;
166                    Poll::Ready(())
167                } else {
168                    // Add the task to the wait queue
169                    wait_node.task = Some(cx.waker().clone());
170                    wait_node.state = PollState::Waiting;
171                    self.waiters.add_front(wait_node);
172                    Poll::Pending
173                }
174            }
175            PollState::Waiting => {
176                // The SemaphoreAcquireFuture is already in the queue.
177                if self.is_fair {
178                    // The task needs to wait until it gets notified in order to
179                    // maintain the ordering.
180                    // However the caller might have passed a different `Waker`.
181                    // In this case we need to update it.
182                    update_waker_ref(&mut wait_node.task, cx);
183                    Poll::Pending
184                } else {
185                    // For throughput improvement purposes, check immediately
186                    // if enough permits are available
187                    if self.permits >= wait_node.required_permits {
188                        self.permits -= wait_node.required_permits;
189                        wait_node.state = PollState::Done;
190                        // Since this waiter has been registered before, it must
191                        // get removed from the waiter list.
192                        // Safety: Due to the state, we know that the node must be part
193                        // of the waiter list
194                        self.force_remove_waiter(wait_node);
195                        Poll::Ready(())
196                    } else {
197                        // The caller might have passed a different `Waker`.
198                        // In this case we need to update it.
199                        update_waker_ref(&mut wait_node.task, cx);
200                        Poll::Pending
201                    }
202                }
203            }
204            PollState::Notified => {
205                // We had been woken by the semaphore, since the semaphore is available again.
206                // The semaphore thereby removed us from the waiters list.
207                // Just try to lock again. If the semaphore isn't available,
208                // we need to add it to the wait queue again.
209                if self.permits >= wait_node.required_permits {
210                    if self.is_fair {
211                        // In a fair Semaphore, the WaitQueueEntry is kept in the
212                        // linked list and must be removed here
213                        // Safety: Due to the state, we know that the node must be part
214                        // of the waiter list
215                        self.force_remove_waiter(wait_node);
216                    }
217                    self.permits -= wait_node.required_permits;
218                    if self.is_fair {
219                        // There might be another task which is ready to run,
220                        // but couldn't, since it was blocked behind the fair waiter.
221                        self.wakeup_waiters();
222                    }
223                    wait_node.state = PollState::Done;
224                    Poll::Ready(())
225                } else {
226                    // A fair semaphore should never end up in that branch, since
227                    // it's only notified when it's permits are guaranteed to
228                    // be available. assert! in order to find logic bugs
229                    assert!(
230                        !self.is_fair,
231                        "Fair semaphores should always be ready when notified"
232                    );
233                    // Add to queue
234                    wait_node.task = Some(cx.waker().clone());
235                    wait_node.state = PollState::Waiting;
236                    self.waiters.add_front(wait_node);
237                    Poll::Pending
238                }
239            }
240            PollState::Done => {
241                // The future had been polled to completion before
242                panic!("polled Mutex after completion");
243            }
244        }
245    }
246
247    /// Tries to remove a waiter from the wait queue, and panics if the
248    /// waiter is no longer valid.
249    unsafe fn force_remove_waiter(
250        &mut self,
251        wait_node: &mut ListNode<WaitQueueEntry>,
252    ) {
253        if !self.waiters.remove(wait_node) {
254            // Panic if the address isn't found. This can only happen if the contract was
255            // violated, e.g. the WaitQueueEntry got moved after the initial poll.
256            panic!("Future could not be removed from wait queue");
257        }
258    }
259
260    /// Removes the waiter from the list.
261    /// This function is only safe as long as the reference that is passed here
262    /// equals the reference/address under which the waiter was added.
263    /// The waiter must not have been moved in between.
264    fn remove_waiter(&mut self, wait_node: &mut ListNode<WaitQueueEntry>) {
265        // SemaphoreAcquireFuture only needs to get removed if it had been added to
266        // the wait queue of the Semaphore. This has happened in the PollState::Waiting case.
267        // If the current waiter was notified, another waiter must get notified now.
268        match wait_node.state {
269            PollState::Notified => {
270                if self.is_fair {
271                    // In a fair Mutex, the WaitQueueEntry is kept in the
272                    // linked list and must be removed here
273                    // Safety: Due to the state, we know that the node must be part
274                    // of the waiter list
275                    unsafe { self.force_remove_waiter(wait_node) };
276                }
277                wait_node.state = PollState::Done;
278                // Wakeup more waiters
279                self.wakeup_waiters();
280            }
281            PollState::Waiting => {
282                // Remove the WaitQueueEntry from the linked list
283                // Safety: Due to the state, we know that the node must be part
284                // of the waiter list
285                unsafe { self.force_remove_waiter(wait_node) };
286                wait_node.state = PollState::Done;
287            }
288            PollState::New | PollState::Done => {}
289        }
290    }
291}
292
293/// An RAII guard returned by the `acquire` and `try_acquire` methods.
294///
295/// When this structure is dropped (falls out of scope),
296/// the amount of permits that was used in the `acquire()` call will be released
297/// back to the Semaphore.
298pub struct GenericSemaphoreReleaser<'a, MutexType: RawMutex> {
299    /// The Semaphore which is associated with this Releaser
300    semaphore: &'a GenericSemaphore<MutexType>,
301    /// The amount of permits to release
302    permits: usize,
303}
304
305impl<MutexType: RawMutex> core::fmt::Debug
306    for GenericSemaphoreReleaser<'_, MutexType>
307{
308    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
309        f.debug_struct("GenericSemaphoreReleaser").finish()
310    }
311}
312
313impl<MutexType: RawMutex> GenericSemaphoreReleaser<'_, MutexType> {
314    /// Prevents the SemaphoreReleaser from automatically releasing the permits
315    /// when it gets dropped.
316    /// This is helpful if the permits must be acquired for a longer lifetime
317    /// than the one of the SemaphoreReleaser.
318    /// If this method is used it is important to release the acquired permits
319    /// manually back to the Semaphore.
320    pub fn disarm(&mut self) -> usize {
321        let permits = self.permits;
322        self.permits = 0;
323        permits
324    }
325}
326
327impl<MutexType: RawMutex> Drop for GenericSemaphoreReleaser<'_, MutexType> {
328    fn drop(&mut self) {
329        // Release the requested amount of permits to the semaphore
330        if self.permits != 0 {
331            self.semaphore.state.lock().release(self.permits);
332        }
333    }
334}
335
336/// A future which resolves when the target semaphore has been successfully acquired.
337#[must_use = "futures do nothing unless polled"]
338pub struct GenericSemaphoreAcquireFuture<'a, MutexType: RawMutex> {
339    /// The Semaphore which should get acquired trough this Future
340    semaphore: Option<&'a GenericSemaphore<MutexType>>,
341    /// Node for waiting at the semaphore
342    wait_node: ListNode<WaitQueueEntry>,
343    /// Whether the obtained permits should automatically be released back
344    /// to the semaphore.
345    auto_release: bool,
346}
347
348// Safety: Futures can be sent between threads as long as the underlying
349// semaphore is thread-safe (Sync), which allows to poll/register/unregister from
350// a different thread.
351unsafe impl<'a, MutexType: RawMutex + Sync> Send
352    for GenericSemaphoreAcquireFuture<'a, MutexType>
353{
354}
355
356impl<'a, MutexType: RawMutex> core::fmt::Debug
357    for GenericSemaphoreAcquireFuture<'a, MutexType>
358{
359    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
360        f.debug_struct("GenericSemaphoreAcquireFuture").finish()
361    }
362}
363
364impl<'a, MutexType: RawMutex> Future
365    for GenericSemaphoreAcquireFuture<'a, MutexType>
366{
367    type Output = GenericSemaphoreReleaser<'a, MutexType>;
368
369    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
370        // Safety: The next operations are safe, because Pin promises us that
371        // the address of the wait queue entry inside GenericSemaphoreAcquireFuture is stable,
372        // and we don't move any fields inside the future until it gets dropped.
373        let mut_self: &mut GenericSemaphoreAcquireFuture<MutexType> =
374            unsafe { Pin::get_unchecked_mut(self) };
375
376        let semaphore = mut_self
377            .semaphore
378            .expect("polled GenericSemaphoreAcquireFuture after completion");
379        let mut semaphore_state = semaphore.state.lock();
380
381        let poll_res =
382            unsafe { semaphore_state.try_acquire(&mut mut_self.wait_node, cx) };
383
384        match poll_res {
385            Poll::Pending => Poll::Pending,
386            Poll::Ready(()) => {
387                // The semaphore was acquired.
388                mut_self.semaphore = None;
389                let to_release = match mut_self.auto_release {
390                    true => mut_self.wait_node.required_permits,
391                    false => 0,
392                };
393                Poll::Ready(GenericSemaphoreReleaser::<'a, MutexType> {
394                    semaphore,
395                    permits: to_release,
396                })
397            }
398        }
399    }
400}
401
402impl<'a, MutexType: RawMutex> FusedFuture
403    for GenericSemaphoreAcquireFuture<'a, MutexType>
404{
405    fn is_terminated(&self) -> bool {
406        self.semaphore.is_none()
407    }
408}
409
410impl<'a, MutexType: RawMutex> Drop
411    for GenericSemaphoreAcquireFuture<'a, MutexType>
412{
413    fn drop(&mut self) {
414        // If this GenericSemaphoreAcquireFuture has been polled and it was added to the
415        // wait queue at the semaphore, it must be removed before dropping.
416        // Otherwise the semaphore would access invalid memory.
417        if let Some(semaphore) = self.semaphore {
418            let mut semaphore_state = semaphore.state.lock();
419            // Analysis: Does the number of permits play a role here?
420            // The future was notified because there was a certain amount of permits
421            // available.
422            // Removing the waiter will wake up as many tasks as there are permits
423            // available inside the Semaphore now. If this is bigger than the
424            // amount of permits required for this task, then additional new
425            // tasks might get woken. However that isn't bad, since
426            // those tasks should get into the wait state anyway.
427            semaphore_state.remove_waiter(&mut self.wait_node);
428        }
429    }
430}
431
432/// A futures-aware semaphore.
433pub struct GenericSemaphore<MutexType: RawMutex> {
434    state: LockApiMutex<MutexType, SemaphoreState>,
435}
436
437// It is safe to send semaphores between threads, as long as they are not used and
438// thereby borrowed
439unsafe impl<MutexType: RawMutex + Send> Send for GenericSemaphore<MutexType> {}
440// The Semaphore is thread-safe as long as the utilized Mutex is thread-safe
441unsafe impl<MutexType: RawMutex + Sync> Sync for GenericSemaphore<MutexType> {}
442
443impl<MutexType: RawMutex> core::fmt::Debug for GenericSemaphore<MutexType> {
444    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
445        f.debug_struct("Semaphore")
446            .field("permits", &self.permits())
447            .finish()
448    }
449}
450
451impl<MutexType: RawMutex> GenericSemaphore<MutexType> {
452    /// Creates a new futures-aware semaphore.
453    ///
454    /// `is_fair` defines whether the `Semaphore` should behave be fair regarding the
455    /// order of waiters. A fair `Semaphore` will only allow the oldest waiter on
456    /// a `Semaphore` to retry acquiring it once it's available again.
457    /// Other waiters must wait until either this acquire attempt completes, and
458    /// the `Semaphore` has enough permits after that, or until the
459    /// [`SemaphoreAcquireFuture`] which tried to acquire the `Semaphore` is dropped.
460    ///
461    /// If the `Semaphore` isn't fair, waiters that wait for a high amount of
462    /// permits might never succeed since the permits might be stolen in between
463    /// by other waiters. Therefore use-cases which make use of very different
464    /// amount of permits per acquire should use fair semaphores.
465    /// For use-cases where each `acquire()` tries to acquire the same amount of
466    /// permits an unfair `Semaphore` might provide throughput advantages.
467    ///
468    /// `permits` is the amount of permits that a semaphore should hold when
469    /// created.
470    pub fn new(is_fair: bool, permits: usize) -> GenericSemaphore<MutexType> {
471        GenericSemaphore::<MutexType> {
472            state: LockApiMutex::new(SemaphoreState::new(is_fair, permits)),
473        }
474    }
475
476    /// Acquire a certain amount of permits on a semaphore asynchronously.
477    ///
478    /// This method returns a future that will resolve once the given amount of
479    /// permits have been acquired.
480    /// The Future will resolve to a [`GenericSemaphoreReleaser`], which will
481    /// release all acquired permits automatically when dropped.
482    pub fn acquire(
483        &self,
484        nr_permits: usize,
485    ) -> GenericSemaphoreAcquireFuture<'_, MutexType> {
486        GenericSemaphoreAcquireFuture::<MutexType> {
487            semaphore: Some(&self),
488            wait_node: ListNode::new(WaitQueueEntry::new(nr_permits)),
489            auto_release: true,
490        }
491    }
492
493    /// Tries to acquire a certain amount of permits on a semaphore.
494    ///
495    /// If acquiring the permits is successful, a [`GenericSemaphoreReleaser`]
496    /// will be returned, which will release all acquired permits automatically
497    /// when dropped.
498    ///
499    /// Otherwise `None` will be returned.
500    pub fn try_acquire(
501        &self,
502        nr_permits: usize,
503    ) -> Option<GenericSemaphoreReleaser<'_, MutexType>> {
504        if self.state.lock().try_acquire_sync(nr_permits) {
505            Some(GenericSemaphoreReleaser {
506                semaphore: self,
507                permits: nr_permits,
508            })
509        } else {
510            None
511        }
512    }
513
514    /// Releases the given amount of permits back to the semaphore.
515    ///
516    /// This method should in most cases not be used, since the
517    /// [`GenericSemaphoreReleaser`] which is obtained when acquiring a Semaphore
518    /// will automatically release the obtained permits again.
519    ///
520    /// Therefore this method should only be used if the automatic release was
521    /// disabled by calling [`GenericSemaphoreReleaser::disarm`],
522    /// or when the amount of permits in the Semaphore
523    /// should increase from the initial amount.
524    pub fn release(&self, nr_permits: usize) {
525        self.state.lock().release(nr_permits)
526    }
527
528    /// Returns the amount of permits that are available on the semaphore
529    pub fn permits(&self) -> usize {
530        self.state.lock().permits()
531    }
532}
533
534// Export a non thread-safe version using NoopLock
535
536/// A [`GenericSemaphore`] which is not thread-safe.
537pub type LocalSemaphore = GenericSemaphore<NoopLock>;
538/// A [`GenericSemaphoreReleaser`] for [`LocalSemaphore`].
539pub type LocalSemaphoreReleaser<'a> = GenericSemaphoreReleaser<'a, NoopLock>;
540/// A [`GenericSemaphoreAcquireFuture`] for [`LocalSemaphore`].
541pub type LocalSemaphoreAcquireFuture<'a> =
542    GenericSemaphoreAcquireFuture<'a, NoopLock>;
543
544#[cfg(feature = "std")]
545mod if_std {
546    use super::*;
547
548    // Export a thread-safe version using parking_lot::RawMutex
549
550    /// A [`GenericSemaphore`] backed by [`parking_lot`].
551    pub type Semaphore = GenericSemaphore<parking_lot::RawMutex>;
552    /// A [`GenericSemaphoreReleaser`] for [`Semaphore`].
553    pub type SemaphoreReleaser<'a> =
554        GenericSemaphoreReleaser<'a, parking_lot::RawMutex>;
555    /// A [`GenericSemaphoreAcquireFuture`] for [`Semaphore`].
556    pub type SemaphoreAcquireFuture<'a> =
557        GenericSemaphoreAcquireFuture<'a, parking_lot::RawMutex>;
558}
559
560#[cfg(feature = "std")]
561pub use self::if_std::*;
562
563#[cfg(feature = "alloc")]
564mod if_alloc {
565    use super::*;
566
567    use alloc::sync::Arc;
568
569    /// An RAII guard returned by the `acquire` and `try_acquire` methods.
570    ///
571    /// When this structure is dropped (falls out of scope),
572    /// the amount of permits that was used in the `acquire()` call will be released
573    /// back to the Semaphore.
574    pub struct GenericSharedSemaphoreReleaser<MutexType: RawMutex> {
575        /// The Semaphore which is associated with this Releaser
576        semaphore: GenericSharedSemaphore<MutexType>,
577        /// The amount of permits to release
578        permits: usize,
579    }
580
581    impl<MutexType: RawMutex> core::fmt::Debug
582        for GenericSharedSemaphoreReleaser<MutexType>
583    {
584        fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
585            f.debug_struct("GenericSharedSemaphoreReleaser").finish()
586        }
587    }
588
589    impl<MutexType: RawMutex> GenericSharedSemaphoreReleaser<MutexType> {
590        /// Prevents the SharedSemaphoreReleaser from automatically releasing the permits
591        /// when it gets dropped.
592        ///
593        /// This is helpful if the permits must be acquired for a longer lifetime
594        /// than the one of the SemaphoreReleaser.
595        ///
596        /// If this method is used it is important to release the acquired permits
597        /// manually back to the Semaphore.
598        pub fn disarm(&mut self) -> usize {
599            let permits = self.permits;
600            self.permits = 0;
601            permits
602        }
603    }
604
605    impl<MutexType: RawMutex> Drop for GenericSharedSemaphoreReleaser<MutexType> {
606        fn drop(&mut self) {
607            // Release the requested amount of permits to the semaphore
608            if self.permits != 0 {
609                self.semaphore.state.lock().release(self.permits);
610            }
611        }
612    }
613
614    /// A future which resolves when the target semaphore has been successfully acquired.
615    #[must_use = "futures do nothing unless polled"]
616    pub struct GenericSharedSemaphoreAcquireFuture<MutexType: RawMutex> {
617        /// The Semaphore which should get acquired trough this Future
618        semaphore: Option<GenericSharedSemaphore<MutexType>>,
619        /// Node for waiting at the semaphore
620        wait_node: ListNode<WaitQueueEntry>,
621        /// Whether the obtained permits should automatically be released back
622        /// to the semaphore.
623        auto_release: bool,
624    }
625
626    // Safety: Futures can be sent between threads as long as the underlying
627    // semaphore is thread-safe (Sync), which allows to poll/register/unregister from
628    // a different thread.
629    unsafe impl<MutexType: RawMutex + Sync> Send
630        for GenericSharedSemaphoreAcquireFuture<MutexType>
631    {
632    }
633
634    impl<MutexType: RawMutex> core::fmt::Debug
635        for GenericSharedSemaphoreAcquireFuture<MutexType>
636    {
637        fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
638            f.debug_struct("GenericSharedSemaphoreAcquireFuture")
639                .finish()
640        }
641    }
642
643    impl<MutexType: RawMutex> Future
644        for GenericSharedSemaphoreAcquireFuture<MutexType>
645    {
646        type Output = GenericSharedSemaphoreReleaser<MutexType>;
647
648        fn poll(
649            self: Pin<&mut Self>,
650            cx: &mut Context<'_>,
651        ) -> Poll<Self::Output> {
652            // Safety: The next operations are safe, because Pin promises us that
653            // the address of the wait queue entry inside
654            // GenericSharedSemaphoreAcquireFuture is stable,
655            // and we don't move any fields inside the future until it gets dropped.
656            let mut_self: &mut GenericSharedSemaphoreAcquireFuture<MutexType> =
657                unsafe { Pin::get_unchecked_mut(self) };
658
659            let semaphore = mut_self.semaphore.take().expect(
660                "polled GenericSharedSemaphoreAcquireFuture after completion",
661            );
662
663            let poll_res = unsafe {
664                let mut semaphore_state = semaphore.state.lock();
665                semaphore_state.try_acquire(&mut mut_self.wait_node, cx)
666            };
667
668            match poll_res {
669                Poll::Pending => {
670                    mut_self.semaphore.replace(semaphore);
671                    Poll::Pending
672                }
673                Poll::Ready(()) => {
674                    let to_release = match mut_self.auto_release {
675                        true => mut_self.wait_node.required_permits,
676                        false => 0,
677                    };
678                    Poll::Ready(GenericSharedSemaphoreReleaser::<MutexType> {
679                        semaphore,
680                        permits: to_release,
681                    })
682                }
683            }
684        }
685    }
686
687    impl<MutexType: RawMutex> FusedFuture
688        for GenericSharedSemaphoreAcquireFuture<MutexType>
689    {
690        fn is_terminated(&self) -> bool {
691            self.semaphore.is_none()
692        }
693    }
694
695    impl<MutexType: RawMutex> Drop
696        for GenericSharedSemaphoreAcquireFuture<MutexType>
697    {
698        fn drop(&mut self) {
699            // If this GenericSharedSemaphoreAcquireFuture has been polled and it was added to the
700            // wait queue at the semaphore, it must be removed before dropping.
701            // Otherwise the semaphore would access invalid memory.
702            if let Some(semaphore) = self.semaphore.take() {
703                let mut semaphore_state = semaphore.state.lock();
704                // Analysis: Does the number of permits play a role here?
705                // The future was notified because there was a certain amount of permits
706                // available.
707                // Removing the waiter will wake up as many tasks as there are permits
708                // available inside the Semaphore now. If this is bigger than the
709                // amount of permits required for this task, then additional new
710                // tasks might get woken. However that isn't bad, since
711                // those tasks should get into the wait state anyway.
712                semaphore_state.remove_waiter(&mut self.wait_node);
713            }
714        }
715    }
716
717    /// A futures-aware shared semaphore.
718    pub struct GenericSharedSemaphore<MutexType: RawMutex> {
719        state: Arc<LockApiMutex<MutexType, SemaphoreState>>,
720    }
721
722    impl<MutexType: RawMutex> Clone for GenericSharedSemaphore<MutexType> {
723        fn clone(&self) -> Self {
724            Self {
725                state: self.state.clone(),
726            }
727        }
728    }
729
730    // It is safe to send semaphores between threads, as long as they are not used and
731    // thereby borrowed
732    unsafe impl<MutexType: RawMutex + Send + Sync> Send
733        for GenericSharedSemaphore<MutexType>
734    {
735    }
736    // The Semaphore is thread-safe as long as the utilized Mutex is thread-safe
737    unsafe impl<MutexType: RawMutex + Sync> Sync
738        for GenericSharedSemaphore<MutexType>
739    {
740    }
741
742    impl<MutexType: RawMutex> core::fmt::Debug
743        for GenericSharedSemaphore<MutexType>
744    {
745        fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
746            f.debug_struct("Semaphore")
747                .field("permits", &self.permits())
748                .finish()
749        }
750    }
751
752    impl<MutexType: RawMutex> GenericSharedSemaphore<MutexType> {
753        /// Creates a new futures-aware shared semaphore.
754        ///
755        /// See `GenericSharedSemaphore` for more information.
756        pub fn new(
757            is_fair: bool,
758            permits: usize,
759        ) -> GenericSharedSemaphore<MutexType> {
760            GenericSharedSemaphore::<MutexType> {
761                state: Arc::new(LockApiMutex::new(SemaphoreState::new(
762                    is_fair, permits,
763                ))),
764            }
765        }
766
767        /// Acquire a certain amount of permits on a semaphore asynchronously.
768        ///
769        /// This method returns a future that will resolve once the given amount of
770        /// permits have been acquired.
771        /// The Future will resolve to a [`GenericSharedSemaphoreReleaser`], which will
772        /// release all acquired permits automatically when dropped.
773        pub fn acquire(
774            &self,
775            nr_permits: usize,
776        ) -> GenericSharedSemaphoreAcquireFuture<MutexType> {
777            GenericSharedSemaphoreAcquireFuture::<MutexType> {
778                semaphore: Some(self.clone()),
779                wait_node: ListNode::new(WaitQueueEntry::new(nr_permits)),
780                auto_release: true,
781            }
782        }
783
784        /// Tries to acquire a certain amount of permits on a semaphore.
785        ///
786        /// If acquiring the permits is successful, a [`GenericSharedSemaphoreReleaser`]
787        /// will be returned, which will release all acquired permits automatically
788        /// when dropped.
789        ///
790        /// Otherwise `None` will be returned.
791        pub fn try_acquire(
792            &self,
793            nr_permits: usize,
794        ) -> Option<GenericSharedSemaphoreReleaser<MutexType>> {
795            if self.state.lock().try_acquire_sync(nr_permits) {
796                Some(GenericSharedSemaphoreReleaser {
797                    semaphore: self.clone(),
798                    permits: nr_permits,
799                })
800            } else {
801                None
802            }
803        }
804
805        /// Releases the given amount of permits back to the semaphore.
806        ///
807        /// This method should in most cases not be used, since the
808        /// [`GenericSharedSemaphoreReleaser`] which is obtained when acquiring a Semaphore
809        /// will automatically release the obtained permits again.
810        ///
811        /// Therefore this method should only be used if the automatic release was
812        /// disabled by calling [`GenericSharedSemaphoreReleaser::disarm`],
813        /// or when the amount of permits in the Semaphore
814        /// should increase from the initial amount.
815        pub fn release(&self, nr_permits: usize) {
816            self.state.lock().release(nr_permits)
817        }
818
819        /// Returns the amount of permits that are available on the semaphore
820        pub fn permits(&self) -> usize {
821            self.state.lock().permits()
822        }
823    }
824
825    // Export parking_lot based shared semaphores in std mode
826    #[cfg(feature = "std")]
827    mod if_std {
828        use super::*;
829
830        /// A [`GenericSharedSemaphore`] backed by [`parking_lot`].
831        pub type SharedSemaphore =
832            GenericSharedSemaphore<parking_lot::RawMutex>;
833        /// A [`GenericSharedSemaphoreReleaser`] for [`SharedSemaphore`].
834        pub type SharedSemaphoreReleaser =
835            GenericSharedSemaphoreReleaser<parking_lot::RawMutex>;
836        /// A [`GenericSharedSemaphoreAcquireFuture`] for [`SharedSemaphore`].
837        pub type SharedSemaphoreAcquireFuture =
838            GenericSharedSemaphoreAcquireFuture<parking_lot::RawMutex>;
839    }
840
841    #[cfg(feature = "std")]
842    pub use self::if_std::*;
843}
844
845#[cfg(feature = "alloc")]
846pub use self::if_alloc::*;