futures_intrusive/sync/semaphore.rs
1//! An asynchronously awaitable semaphore for synchronization between concurrently
2//! executing futures.
3
4use crate::{
5 intrusive_double_linked_list::{LinkedList, ListNode},
6 utils::update_waker_ref,
7 NoopLock,
8};
9use core::pin::Pin;
10use futures_core::{
11 future::{FusedFuture, Future},
12 task::{Context, Poll, Waker},
13};
14use lock_api::{Mutex as LockApiMutex, RawMutex};
15
16/// Tracks how the future had interacted with the semaphore
17#[derive(PartialEq)]
18enum PollState {
19 /// The task has never interacted with the semaphore.
20 New,
21 /// The task was added to the wait queue at the semaphore.
22 Waiting,
23 /// The task had previously waited on the semaphore, but was notified
24 /// that the semaphore was released in the meantime and that the task
25 /// thereby could retry.
26 Notified,
27 /// The task had been polled to completion.
28 Done,
29}
30
31/// Tracks the SemaphoreAcquireFuture waiting state.
32struct WaitQueueEntry {
33 /// The task handle of the waiting task
34 task: Option<Waker>,
35 /// Current polling state
36 state: PollState,
37 /// The amount of permits that should be obtained
38 required_permits: usize,
39}
40
41impl WaitQueueEntry {
42 /// Creates a new WaitQueueEntry
43 fn new(required_permits: usize) -> WaitQueueEntry {
44 WaitQueueEntry {
45 task: None,
46 state: PollState::New,
47 required_permits,
48 }
49 }
50}
51
52/// Internal state of the `Semaphore`
53struct SemaphoreState {
54 is_fair: bool,
55 permits: usize,
56 waiters: LinkedList<WaitQueueEntry>,
57}
58
59impl SemaphoreState {
60 fn new(is_fair: bool, permits: usize) -> Self {
61 SemaphoreState {
62 is_fair,
63 permits,
64 waiters: LinkedList::new(),
65 }
66 }
67
68 /// Wakes up the last waiter and removes it from the wait queue
69 fn wakeup_waiters(&mut self) {
70 // Wake as many tasks as the permits allow
71 let mut available = self.permits;
72
73 loop {
74 match self.waiters.peek_last_mut() {
75 None => return,
76 Some(last_waiter) => {
77 // Check if enough permits are available for this waiter.
78 // If not then a wakeup attempt won't be successful.
79 if available < last_waiter.required_permits {
80 return;
81 }
82 available -= last_waiter.required_permits;
83
84 // Notify the waiter that it can try to acquire the semaphore again.
85 // The notification gets tracked inside the waiter.
86 // If the waiter aborts it's wait (drops the future), another task
87 // must be woken.
88 if last_waiter.state != PollState::Notified {
89 last_waiter.state = PollState::Notified;
90
91 let task = &last_waiter.task;
92 if let Some(ref handle) = task {
93 handle.wake_by_ref();
94 }
95 }
96
97 // In the case of a non-fair semaphore, the waiters are directly
98 // removed from the semaphores wait queue when woken.
99 // That avoids having to remove the wait element later.
100 if !self.is_fair {
101 self.waiters.remove_last();
102 } else {
103 // For a fair Semaphore we never wake more than 1 task.
104 // That one needs to acquire the Semaphore.
105 // TODO: We actually should be able to wake more, since
106 // it's guaranteed that both tasks could make progress.
107 // However the we currently can't peek iterate in reverse order.
108 return;
109 }
110 }
111 }
112 }
113 }
114
115 fn permits(&self) -> usize {
116 self.permits
117 }
118
119 /// Releases a certain amount of permits back to the semaphore
120 fn release(&mut self, permits: usize) {
121 if permits == 0 {
122 return;
123 }
124 // TODO: Overflow check
125 self.permits += permits;
126
127 // Wakeup the last waiter
128 self.wakeup_waiters();
129 }
130
131 /// Tries to acquire the given amount of permits synchronously.
132 ///
133 /// Returns true if the permits were obtained and false otherwise.
134 fn try_acquire_sync(&mut self, required_permits: usize) -> bool {
135 // Permits can only be obtained synchronously if there are
136 // - enough permits available
137 // - the Semaphore is either not fair, or there are no waiters
138 // - required_permits == 0
139 if (self.permits >= required_permits)
140 && (!self.is_fair
141 || self.waiters.is_empty()
142 || required_permits == 0)
143 {
144 self.permits -= required_permits;
145 true
146 } else {
147 false
148 }
149 }
150
151 /// Tries to acquire the Semaphore from a WaitQueueEntry.
152 /// If it isn't available, the WaitQueueEntry gets added to the wait
153 /// queue at the Semaphore, and will be signalled once ready.
154 /// This function is only safe as long as the `wait_node`s address is guaranteed
155 /// to be stable until it gets removed from the queue.
156 unsafe fn try_acquire(
157 &mut self,
158 wait_node: &mut ListNode<WaitQueueEntry>,
159 cx: &mut Context<'_>,
160 ) -> Poll<()> {
161 match wait_node.state {
162 PollState::New => {
163 // The fast path - enough permits are available
164 if self.try_acquire_sync(wait_node.required_permits) {
165 wait_node.state = PollState::Done;
166 Poll::Ready(())
167 } else {
168 // Add the task to the wait queue
169 wait_node.task = Some(cx.waker().clone());
170 wait_node.state = PollState::Waiting;
171 self.waiters.add_front(wait_node);
172 Poll::Pending
173 }
174 }
175 PollState::Waiting => {
176 // The SemaphoreAcquireFuture is already in the queue.
177 if self.is_fair {
178 // The task needs to wait until it gets notified in order to
179 // maintain the ordering.
180 // However the caller might have passed a different `Waker`.
181 // In this case we need to update it.
182 update_waker_ref(&mut wait_node.task, cx);
183 Poll::Pending
184 } else {
185 // For throughput improvement purposes, check immediately
186 // if enough permits are available
187 if self.permits >= wait_node.required_permits {
188 self.permits -= wait_node.required_permits;
189 wait_node.state = PollState::Done;
190 // Since this waiter has been registered before, it must
191 // get removed from the waiter list.
192 // Safety: Due to the state, we know that the node must be part
193 // of the waiter list
194 self.force_remove_waiter(wait_node);
195 Poll::Ready(())
196 } else {
197 // The caller might have passed a different `Waker`.
198 // In this case we need to update it.
199 update_waker_ref(&mut wait_node.task, cx);
200 Poll::Pending
201 }
202 }
203 }
204 PollState::Notified => {
205 // We had been woken by the semaphore, since the semaphore is available again.
206 // The semaphore thereby removed us from the waiters list.
207 // Just try to lock again. If the semaphore isn't available,
208 // we need to add it to the wait queue again.
209 if self.permits >= wait_node.required_permits {
210 if self.is_fair {
211 // In a fair Semaphore, the WaitQueueEntry is kept in the
212 // linked list and must be removed here
213 // Safety: Due to the state, we know that the node must be part
214 // of the waiter list
215 self.force_remove_waiter(wait_node);
216 }
217 self.permits -= wait_node.required_permits;
218 if self.is_fair {
219 // There might be another task which is ready to run,
220 // but couldn't, since it was blocked behind the fair waiter.
221 self.wakeup_waiters();
222 }
223 wait_node.state = PollState::Done;
224 Poll::Ready(())
225 } else {
226 // A fair semaphore should never end up in that branch, since
227 // it's only notified when it's permits are guaranteed to
228 // be available. assert! in order to find logic bugs
229 assert!(
230 !self.is_fair,
231 "Fair semaphores should always be ready when notified"
232 );
233 // Add to queue
234 wait_node.task = Some(cx.waker().clone());
235 wait_node.state = PollState::Waiting;
236 self.waiters.add_front(wait_node);
237 Poll::Pending
238 }
239 }
240 PollState::Done => {
241 // The future had been polled to completion before
242 panic!("polled Mutex after completion");
243 }
244 }
245 }
246
247 /// Tries to remove a waiter from the wait queue, and panics if the
248 /// waiter is no longer valid.
249 unsafe fn force_remove_waiter(
250 &mut self,
251 wait_node: &mut ListNode<WaitQueueEntry>,
252 ) {
253 if !self.waiters.remove(wait_node) {
254 // Panic if the address isn't found. This can only happen if the contract was
255 // violated, e.g. the WaitQueueEntry got moved after the initial poll.
256 panic!("Future could not be removed from wait queue");
257 }
258 }
259
260 /// Removes the waiter from the list.
261 /// This function is only safe as long as the reference that is passed here
262 /// equals the reference/address under which the waiter was added.
263 /// The waiter must not have been moved in between.
264 fn remove_waiter(&mut self, wait_node: &mut ListNode<WaitQueueEntry>) {
265 // SemaphoreAcquireFuture only needs to get removed if it had been added to
266 // the wait queue of the Semaphore. This has happened in the PollState::Waiting case.
267 // If the current waiter was notified, another waiter must get notified now.
268 match wait_node.state {
269 PollState::Notified => {
270 if self.is_fair {
271 // In a fair Mutex, the WaitQueueEntry is kept in the
272 // linked list and must be removed here
273 // Safety: Due to the state, we know that the node must be part
274 // of the waiter list
275 unsafe { self.force_remove_waiter(wait_node) };
276 }
277 wait_node.state = PollState::Done;
278 // Wakeup more waiters
279 self.wakeup_waiters();
280 }
281 PollState::Waiting => {
282 // Remove the WaitQueueEntry from the linked list
283 // Safety: Due to the state, we know that the node must be part
284 // of the waiter list
285 unsafe { self.force_remove_waiter(wait_node) };
286 wait_node.state = PollState::Done;
287 }
288 PollState::New | PollState::Done => {}
289 }
290 }
291}
292
293/// An RAII guard returned by the `acquire` and `try_acquire` methods.
294///
295/// When this structure is dropped (falls out of scope),
296/// the amount of permits that was used in the `acquire()` call will be released
297/// back to the Semaphore.
298pub struct GenericSemaphoreReleaser<'a, MutexType: RawMutex> {
299 /// The Semaphore which is associated with this Releaser
300 semaphore: &'a GenericSemaphore<MutexType>,
301 /// The amount of permits to release
302 permits: usize,
303}
304
305impl<MutexType: RawMutex> core::fmt::Debug
306 for GenericSemaphoreReleaser<'_, MutexType>
307{
308 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
309 f.debug_struct("GenericSemaphoreReleaser").finish()
310 }
311}
312
313impl<MutexType: RawMutex> GenericSemaphoreReleaser<'_, MutexType> {
314 /// Prevents the SemaphoreReleaser from automatically releasing the permits
315 /// when it gets dropped.
316 /// This is helpful if the permits must be acquired for a longer lifetime
317 /// than the one of the SemaphoreReleaser.
318 /// If this method is used it is important to release the acquired permits
319 /// manually back to the Semaphore.
320 pub fn disarm(&mut self) -> usize {
321 let permits = self.permits;
322 self.permits = 0;
323 permits
324 }
325}
326
327impl<MutexType: RawMutex> Drop for GenericSemaphoreReleaser<'_, MutexType> {
328 fn drop(&mut self) {
329 // Release the requested amount of permits to the semaphore
330 if self.permits != 0 {
331 self.semaphore.state.lock().release(self.permits);
332 }
333 }
334}
335
336/// A future which resolves when the target semaphore has been successfully acquired.
337#[must_use = "futures do nothing unless polled"]
338pub struct GenericSemaphoreAcquireFuture<'a, MutexType: RawMutex> {
339 /// The Semaphore which should get acquired trough this Future
340 semaphore: Option<&'a GenericSemaphore<MutexType>>,
341 /// Node for waiting at the semaphore
342 wait_node: ListNode<WaitQueueEntry>,
343 /// Whether the obtained permits should automatically be released back
344 /// to the semaphore.
345 auto_release: bool,
346}
347
348// Safety: Futures can be sent between threads as long as the underlying
349// semaphore is thread-safe (Sync), which allows to poll/register/unregister from
350// a different thread.
351unsafe impl<'a, MutexType: RawMutex + Sync> Send
352 for GenericSemaphoreAcquireFuture<'a, MutexType>
353{
354}
355
356impl<'a, MutexType: RawMutex> core::fmt::Debug
357 for GenericSemaphoreAcquireFuture<'a, MutexType>
358{
359 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
360 f.debug_struct("GenericSemaphoreAcquireFuture").finish()
361 }
362}
363
364impl<'a, MutexType: RawMutex> Future
365 for GenericSemaphoreAcquireFuture<'a, MutexType>
366{
367 type Output = GenericSemaphoreReleaser<'a, MutexType>;
368
369 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
370 // Safety: The next operations are safe, because Pin promises us that
371 // the address of the wait queue entry inside GenericSemaphoreAcquireFuture is stable,
372 // and we don't move any fields inside the future until it gets dropped.
373 let mut_self: &mut GenericSemaphoreAcquireFuture<MutexType> =
374 unsafe { Pin::get_unchecked_mut(self) };
375
376 let semaphore = mut_self
377 .semaphore
378 .expect("polled GenericSemaphoreAcquireFuture after completion");
379 let mut semaphore_state = semaphore.state.lock();
380
381 let poll_res =
382 unsafe { semaphore_state.try_acquire(&mut mut_self.wait_node, cx) };
383
384 match poll_res {
385 Poll::Pending => Poll::Pending,
386 Poll::Ready(()) => {
387 // The semaphore was acquired.
388 mut_self.semaphore = None;
389 let to_release = match mut_self.auto_release {
390 true => mut_self.wait_node.required_permits,
391 false => 0,
392 };
393 Poll::Ready(GenericSemaphoreReleaser::<'a, MutexType> {
394 semaphore,
395 permits: to_release,
396 })
397 }
398 }
399 }
400}
401
402impl<'a, MutexType: RawMutex> FusedFuture
403 for GenericSemaphoreAcquireFuture<'a, MutexType>
404{
405 fn is_terminated(&self) -> bool {
406 self.semaphore.is_none()
407 }
408}
409
410impl<'a, MutexType: RawMutex> Drop
411 for GenericSemaphoreAcquireFuture<'a, MutexType>
412{
413 fn drop(&mut self) {
414 // If this GenericSemaphoreAcquireFuture has been polled and it was added to the
415 // wait queue at the semaphore, it must be removed before dropping.
416 // Otherwise the semaphore would access invalid memory.
417 if let Some(semaphore) = self.semaphore {
418 let mut semaphore_state = semaphore.state.lock();
419 // Analysis: Does the number of permits play a role here?
420 // The future was notified because there was a certain amount of permits
421 // available.
422 // Removing the waiter will wake up as many tasks as there are permits
423 // available inside the Semaphore now. If this is bigger than the
424 // amount of permits required for this task, then additional new
425 // tasks might get woken. However that isn't bad, since
426 // those tasks should get into the wait state anyway.
427 semaphore_state.remove_waiter(&mut self.wait_node);
428 }
429 }
430}
431
432/// A futures-aware semaphore.
433pub struct GenericSemaphore<MutexType: RawMutex> {
434 state: LockApiMutex<MutexType, SemaphoreState>,
435}
436
437// It is safe to send semaphores between threads, as long as they are not used and
438// thereby borrowed
439unsafe impl<MutexType: RawMutex + Send> Send for GenericSemaphore<MutexType> {}
440// The Semaphore is thread-safe as long as the utilized Mutex is thread-safe
441unsafe impl<MutexType: RawMutex + Sync> Sync for GenericSemaphore<MutexType> {}
442
443impl<MutexType: RawMutex> core::fmt::Debug for GenericSemaphore<MutexType> {
444 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
445 f.debug_struct("Semaphore")
446 .field("permits", &self.permits())
447 .finish()
448 }
449}
450
451impl<MutexType: RawMutex> GenericSemaphore<MutexType> {
452 /// Creates a new futures-aware semaphore.
453 ///
454 /// `is_fair` defines whether the `Semaphore` should behave be fair regarding the
455 /// order of waiters. A fair `Semaphore` will only allow the oldest waiter on
456 /// a `Semaphore` to retry acquiring it once it's available again.
457 /// Other waiters must wait until either this acquire attempt completes, and
458 /// the `Semaphore` has enough permits after that, or until the
459 /// [`SemaphoreAcquireFuture`] which tried to acquire the `Semaphore` is dropped.
460 ///
461 /// If the `Semaphore` isn't fair, waiters that wait for a high amount of
462 /// permits might never succeed since the permits might be stolen in between
463 /// by other waiters. Therefore use-cases which make use of very different
464 /// amount of permits per acquire should use fair semaphores.
465 /// For use-cases where each `acquire()` tries to acquire the same amount of
466 /// permits an unfair `Semaphore` might provide throughput advantages.
467 ///
468 /// `permits` is the amount of permits that a semaphore should hold when
469 /// created.
470 pub fn new(is_fair: bool, permits: usize) -> GenericSemaphore<MutexType> {
471 GenericSemaphore::<MutexType> {
472 state: LockApiMutex::new(SemaphoreState::new(is_fair, permits)),
473 }
474 }
475
476 /// Acquire a certain amount of permits on a semaphore asynchronously.
477 ///
478 /// This method returns a future that will resolve once the given amount of
479 /// permits have been acquired.
480 /// The Future will resolve to a [`GenericSemaphoreReleaser`], which will
481 /// release all acquired permits automatically when dropped.
482 pub fn acquire(
483 &self,
484 nr_permits: usize,
485 ) -> GenericSemaphoreAcquireFuture<'_, MutexType> {
486 GenericSemaphoreAcquireFuture::<MutexType> {
487 semaphore: Some(&self),
488 wait_node: ListNode::new(WaitQueueEntry::new(nr_permits)),
489 auto_release: true,
490 }
491 }
492
493 /// Tries to acquire a certain amount of permits on a semaphore.
494 ///
495 /// If acquiring the permits is successful, a [`GenericSemaphoreReleaser`]
496 /// will be returned, which will release all acquired permits automatically
497 /// when dropped.
498 ///
499 /// Otherwise `None` will be returned.
500 pub fn try_acquire(
501 &self,
502 nr_permits: usize,
503 ) -> Option<GenericSemaphoreReleaser<'_, MutexType>> {
504 if self.state.lock().try_acquire_sync(nr_permits) {
505 Some(GenericSemaphoreReleaser {
506 semaphore: self,
507 permits: nr_permits,
508 })
509 } else {
510 None
511 }
512 }
513
514 /// Releases the given amount of permits back to the semaphore.
515 ///
516 /// This method should in most cases not be used, since the
517 /// [`GenericSemaphoreReleaser`] which is obtained when acquiring a Semaphore
518 /// will automatically release the obtained permits again.
519 ///
520 /// Therefore this method should only be used if the automatic release was
521 /// disabled by calling [`GenericSemaphoreReleaser::disarm`],
522 /// or when the amount of permits in the Semaphore
523 /// should increase from the initial amount.
524 pub fn release(&self, nr_permits: usize) {
525 self.state.lock().release(nr_permits)
526 }
527
528 /// Returns the amount of permits that are available on the semaphore
529 pub fn permits(&self) -> usize {
530 self.state.lock().permits()
531 }
532}
533
534// Export a non thread-safe version using NoopLock
535
536/// A [`GenericSemaphore`] which is not thread-safe.
537pub type LocalSemaphore = GenericSemaphore<NoopLock>;
538/// A [`GenericSemaphoreReleaser`] for [`LocalSemaphore`].
539pub type LocalSemaphoreReleaser<'a> = GenericSemaphoreReleaser<'a, NoopLock>;
540/// A [`GenericSemaphoreAcquireFuture`] for [`LocalSemaphore`].
541pub type LocalSemaphoreAcquireFuture<'a> =
542 GenericSemaphoreAcquireFuture<'a, NoopLock>;
543
544#[cfg(feature = "std")]
545mod if_std {
546 use super::*;
547
548 // Export a thread-safe version using parking_lot::RawMutex
549
550 /// A [`GenericSemaphore`] backed by [`parking_lot`].
551 pub type Semaphore = GenericSemaphore<parking_lot::RawMutex>;
552 /// A [`GenericSemaphoreReleaser`] for [`Semaphore`].
553 pub type SemaphoreReleaser<'a> =
554 GenericSemaphoreReleaser<'a, parking_lot::RawMutex>;
555 /// A [`GenericSemaphoreAcquireFuture`] for [`Semaphore`].
556 pub type SemaphoreAcquireFuture<'a> =
557 GenericSemaphoreAcquireFuture<'a, parking_lot::RawMutex>;
558}
559
560#[cfg(feature = "std")]
561pub use self::if_std::*;
562
563#[cfg(feature = "alloc")]
564mod if_alloc {
565 use super::*;
566
567 use alloc::sync::Arc;
568
569 /// An RAII guard returned by the `acquire` and `try_acquire` methods.
570 ///
571 /// When this structure is dropped (falls out of scope),
572 /// the amount of permits that was used in the `acquire()` call will be released
573 /// back to the Semaphore.
574 pub struct GenericSharedSemaphoreReleaser<MutexType: RawMutex> {
575 /// The Semaphore which is associated with this Releaser
576 semaphore: GenericSharedSemaphore<MutexType>,
577 /// The amount of permits to release
578 permits: usize,
579 }
580
581 impl<MutexType: RawMutex> core::fmt::Debug
582 for GenericSharedSemaphoreReleaser<MutexType>
583 {
584 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
585 f.debug_struct("GenericSharedSemaphoreReleaser").finish()
586 }
587 }
588
589 impl<MutexType: RawMutex> GenericSharedSemaphoreReleaser<MutexType> {
590 /// Prevents the SharedSemaphoreReleaser from automatically releasing the permits
591 /// when it gets dropped.
592 ///
593 /// This is helpful if the permits must be acquired for a longer lifetime
594 /// than the one of the SemaphoreReleaser.
595 ///
596 /// If this method is used it is important to release the acquired permits
597 /// manually back to the Semaphore.
598 pub fn disarm(&mut self) -> usize {
599 let permits = self.permits;
600 self.permits = 0;
601 permits
602 }
603 }
604
605 impl<MutexType: RawMutex> Drop for GenericSharedSemaphoreReleaser<MutexType> {
606 fn drop(&mut self) {
607 // Release the requested amount of permits to the semaphore
608 if self.permits != 0 {
609 self.semaphore.state.lock().release(self.permits);
610 }
611 }
612 }
613
614 /// A future which resolves when the target semaphore has been successfully acquired.
615 #[must_use = "futures do nothing unless polled"]
616 pub struct GenericSharedSemaphoreAcquireFuture<MutexType: RawMutex> {
617 /// The Semaphore which should get acquired trough this Future
618 semaphore: Option<GenericSharedSemaphore<MutexType>>,
619 /// Node for waiting at the semaphore
620 wait_node: ListNode<WaitQueueEntry>,
621 /// Whether the obtained permits should automatically be released back
622 /// to the semaphore.
623 auto_release: bool,
624 }
625
626 // Safety: Futures can be sent between threads as long as the underlying
627 // semaphore is thread-safe (Sync), which allows to poll/register/unregister from
628 // a different thread.
629 unsafe impl<MutexType: RawMutex + Sync> Send
630 for GenericSharedSemaphoreAcquireFuture<MutexType>
631 {
632 }
633
634 impl<MutexType: RawMutex> core::fmt::Debug
635 for GenericSharedSemaphoreAcquireFuture<MutexType>
636 {
637 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
638 f.debug_struct("GenericSharedSemaphoreAcquireFuture")
639 .finish()
640 }
641 }
642
643 impl<MutexType: RawMutex> Future
644 for GenericSharedSemaphoreAcquireFuture<MutexType>
645 {
646 type Output = GenericSharedSemaphoreReleaser<MutexType>;
647
648 fn poll(
649 self: Pin<&mut Self>,
650 cx: &mut Context<'_>,
651 ) -> Poll<Self::Output> {
652 // Safety: The next operations are safe, because Pin promises us that
653 // the address of the wait queue entry inside
654 // GenericSharedSemaphoreAcquireFuture is stable,
655 // and we don't move any fields inside the future until it gets dropped.
656 let mut_self: &mut GenericSharedSemaphoreAcquireFuture<MutexType> =
657 unsafe { Pin::get_unchecked_mut(self) };
658
659 let semaphore = mut_self.semaphore.take().expect(
660 "polled GenericSharedSemaphoreAcquireFuture after completion",
661 );
662
663 let poll_res = unsafe {
664 let mut semaphore_state = semaphore.state.lock();
665 semaphore_state.try_acquire(&mut mut_self.wait_node, cx)
666 };
667
668 match poll_res {
669 Poll::Pending => {
670 mut_self.semaphore.replace(semaphore);
671 Poll::Pending
672 }
673 Poll::Ready(()) => {
674 let to_release = match mut_self.auto_release {
675 true => mut_self.wait_node.required_permits,
676 false => 0,
677 };
678 Poll::Ready(GenericSharedSemaphoreReleaser::<MutexType> {
679 semaphore,
680 permits: to_release,
681 })
682 }
683 }
684 }
685 }
686
687 impl<MutexType: RawMutex> FusedFuture
688 for GenericSharedSemaphoreAcquireFuture<MutexType>
689 {
690 fn is_terminated(&self) -> bool {
691 self.semaphore.is_none()
692 }
693 }
694
695 impl<MutexType: RawMutex> Drop
696 for GenericSharedSemaphoreAcquireFuture<MutexType>
697 {
698 fn drop(&mut self) {
699 // If this GenericSharedSemaphoreAcquireFuture has been polled and it was added to the
700 // wait queue at the semaphore, it must be removed before dropping.
701 // Otherwise the semaphore would access invalid memory.
702 if let Some(semaphore) = self.semaphore.take() {
703 let mut semaphore_state = semaphore.state.lock();
704 // Analysis: Does the number of permits play a role here?
705 // The future was notified because there was a certain amount of permits
706 // available.
707 // Removing the waiter will wake up as many tasks as there are permits
708 // available inside the Semaphore now. If this is bigger than the
709 // amount of permits required for this task, then additional new
710 // tasks might get woken. However that isn't bad, since
711 // those tasks should get into the wait state anyway.
712 semaphore_state.remove_waiter(&mut self.wait_node);
713 }
714 }
715 }
716
717 /// A futures-aware shared semaphore.
718 pub struct GenericSharedSemaphore<MutexType: RawMutex> {
719 state: Arc<LockApiMutex<MutexType, SemaphoreState>>,
720 }
721
722 impl<MutexType: RawMutex> Clone for GenericSharedSemaphore<MutexType> {
723 fn clone(&self) -> Self {
724 Self {
725 state: self.state.clone(),
726 }
727 }
728 }
729
730 // It is safe to send semaphores between threads, as long as they are not used and
731 // thereby borrowed
732 unsafe impl<MutexType: RawMutex + Send + Sync> Send
733 for GenericSharedSemaphore<MutexType>
734 {
735 }
736 // The Semaphore is thread-safe as long as the utilized Mutex is thread-safe
737 unsafe impl<MutexType: RawMutex + Sync> Sync
738 for GenericSharedSemaphore<MutexType>
739 {
740 }
741
742 impl<MutexType: RawMutex> core::fmt::Debug
743 for GenericSharedSemaphore<MutexType>
744 {
745 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
746 f.debug_struct("Semaphore")
747 .field("permits", &self.permits())
748 .finish()
749 }
750 }
751
752 impl<MutexType: RawMutex> GenericSharedSemaphore<MutexType> {
753 /// Creates a new futures-aware shared semaphore.
754 ///
755 /// See `GenericSharedSemaphore` for more information.
756 pub fn new(
757 is_fair: bool,
758 permits: usize,
759 ) -> GenericSharedSemaphore<MutexType> {
760 GenericSharedSemaphore::<MutexType> {
761 state: Arc::new(LockApiMutex::new(SemaphoreState::new(
762 is_fair, permits,
763 ))),
764 }
765 }
766
767 /// Acquire a certain amount of permits on a semaphore asynchronously.
768 ///
769 /// This method returns a future that will resolve once the given amount of
770 /// permits have been acquired.
771 /// The Future will resolve to a [`GenericSharedSemaphoreReleaser`], which will
772 /// release all acquired permits automatically when dropped.
773 pub fn acquire(
774 &self,
775 nr_permits: usize,
776 ) -> GenericSharedSemaphoreAcquireFuture<MutexType> {
777 GenericSharedSemaphoreAcquireFuture::<MutexType> {
778 semaphore: Some(self.clone()),
779 wait_node: ListNode::new(WaitQueueEntry::new(nr_permits)),
780 auto_release: true,
781 }
782 }
783
784 /// Tries to acquire a certain amount of permits on a semaphore.
785 ///
786 /// If acquiring the permits is successful, a [`GenericSharedSemaphoreReleaser`]
787 /// will be returned, which will release all acquired permits automatically
788 /// when dropped.
789 ///
790 /// Otherwise `None` will be returned.
791 pub fn try_acquire(
792 &self,
793 nr_permits: usize,
794 ) -> Option<GenericSharedSemaphoreReleaser<MutexType>> {
795 if self.state.lock().try_acquire_sync(nr_permits) {
796 Some(GenericSharedSemaphoreReleaser {
797 semaphore: self.clone(),
798 permits: nr_permits,
799 })
800 } else {
801 None
802 }
803 }
804
805 /// Releases the given amount of permits back to the semaphore.
806 ///
807 /// This method should in most cases not be used, since the
808 /// [`GenericSharedSemaphoreReleaser`] which is obtained when acquiring a Semaphore
809 /// will automatically release the obtained permits again.
810 ///
811 /// Therefore this method should only be used if the automatic release was
812 /// disabled by calling [`GenericSharedSemaphoreReleaser::disarm`],
813 /// or when the amount of permits in the Semaphore
814 /// should increase from the initial amount.
815 pub fn release(&self, nr_permits: usize) {
816 self.state.lock().release(nr_permits)
817 }
818
819 /// Returns the amount of permits that are available on the semaphore
820 pub fn permits(&self) -> usize {
821 self.state.lock().permits()
822 }
823 }
824
825 // Export parking_lot based shared semaphores in std mode
826 #[cfg(feature = "std")]
827 mod if_std {
828 use super::*;
829
830 /// A [`GenericSharedSemaphore`] backed by [`parking_lot`].
831 pub type SharedSemaphore =
832 GenericSharedSemaphore<parking_lot::RawMutex>;
833 /// A [`GenericSharedSemaphoreReleaser`] for [`SharedSemaphore`].
834 pub type SharedSemaphoreReleaser =
835 GenericSharedSemaphoreReleaser<parking_lot::RawMutex>;
836 /// A [`GenericSharedSemaphoreAcquireFuture`] for [`SharedSemaphore`].
837 pub type SharedSemaphoreAcquireFuture =
838 GenericSharedSemaphoreAcquireFuture<parking_lot::RawMutex>;
839 }
840
841 #[cfg(feature = "std")]
842 pub use self::if_std::*;
843}
844
845#[cfg(feature = "alloc")]
846pub use self::if_alloc::*;